跳转至

接口签名认证


接口签名算法

参与签名计算的元素

参数名 描述
method 接口请求方法, 目前支持的请求方法有GETPOST
nonce 随机数,对应请求头中的X-Df-Nonce
timestamp 请求发起的时间点对应的时间戳,秒级单位,对应请求头中的X-Df-Timestamp
path 原始的请求路径(包含查询字符串), 例如:
GET请求: /api/v1/account/list?pageIndex=1&pageSize=20
POST请求: /api/v1/account/add
body 请求中原始的 body 信息,无 body 时,body 默认为空字符串,并参与签名计算

签名计算逻辑

将 method(大写)、nonce(请求时生成的随机临时码) 、 path(原始请求路径和查询字符串)、 timestamp、body 使用 一个空格 拼接为签名原文字符串(string_to_sign). 当无 body 时, body 默认用空字符串代替。 即签名原文字符串 string_to_sign = '{method} {nonce} {path} {timestamp} {body}'. 用「服务配置项」描述中的 secretKey 作为密钥, 对string_to_sign进行加密(hmac sha256) 得到每次请求的认证签名,即请求头 X-Signature 的值.

python 版签名和接口请求示例

import json
import time
import hmac
import uuid
import hashlib
from urllib.parse import urlencode
import requests

class DFExternalAPI(object):

    def __init__(self, server, access_key, secret_key, debug=False):
        self.server = server
        self.access_key = access_key
        self.secret_key = secret_key
        self.debug = debug

    def get_sign(self, method, full_path, timestamp, nonce, body=None):
        """ 生成签名 """
        if body is None:
            # 无body时, 默认设置 body为空字符串
            body = ""
        # 原始签名字符串
        string_to_sign = " ".join([method, nonce, full_path, timestamp, body])
        # 对签名字符串进行 编码
        string_to_sign = string_to_sign.encode(encoding="utf-8")
        secret_key = self.secret_key.encode(encoding="utf-8")
        # 进行 hmac 加密
        h = hmac.new(secret_key, string_to_sign, hashlib.sha256)
        sign = h.hexdigest()
        return sign

    def get_auth_header(self, method, full_path, body):
        """ 生成请求头相关信息 """
        # 当前时间点的时间戳
        timestamp = str(int(time.time()))
        # 随机数
        nonce = uuid.uuid4().hex
        # 生成签名
        sign = self.get_sign(method, full_path, timestamp, nonce, body) if self.access_key else None
        # 生成完整请求头相关信息
        auth_headers = {
            'Content-Type': 'application/json',
            'X-Df-Access-Key': self.access_key,
            'X-Df-Timestamp': timestamp,
            'X-Df-Nonce': nonce,
            'X-Df-SVersion': "v20240417",
            'X-Df-Signature': sign,
        }
        return auth_headers

    def run(self, method, path, query=None, body=None, headers=None, **kwargs):
        """ 执行请求并提取响应信息 """
        # 格式化请求方法
        method = method.upper()
        # 生成完整的请求路径信息
        if query:
            path = path + '?' + urlencode(query, encoding='utf-8')
        # 对 body 主体需要进行 unicode 编码(key排序不是必须的, 只要确保最终 body 为 unicode 编码即可)
        if body:
            if isinstance(body, (tuple, list, dict)):
                body = json.dumps(body, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
        else:
            body = ""
        # 合并 请求头信息
        headers = headers or {}
        new_headers = self.get_auth_header(method, path, body)
        new_headers.update(headers)
        # 完整的请求url
        url = self.server + path
        if self.debug:
            print("============请求详情==================")
            print("url: ", url)
            print("---- headers ----")
            print(json.dumps(headers, indent=4, ensure_ascii=False))
            print("---- body ----")
            print(json.dumps(body, indent=4, ensure_ascii=False))
            print("==============================")
        # 执行请求
        resp = requests.__getattribute__(method.lower())(
            url,
            data=body.encode(encoding='utf-8'),
            headers=new_headers
        )
        # 获取响应状态
        resp_status_code = resp.status_code
        # 响应内容
        resp_data = resp.text
        if resp_status_code != 200:
            raise Exception(resp_data)

        resp_content_type = resp.headers.get('Content-Type')
        if isinstance(resp_content_type, str):
            resp_content_type = resp_content_type.split(';')[0].strip()

        if resp_content_type == 'application/json':
            try:
                resp_data = json.loads(resp_data)
            except Exception as e:
                raise Exception('Cannot parse response body as json', resp_data)

        return resp_status_code, resp_data

    def get(self, path, query=None, headers=None):
        return self.run('GET', path, query, None, headers)

    def post(self, path, query=None, body=None, headers=None, **kwargs):
        return self.run('POST', path, query, body, headers, **kwargs)

    def account_list(self, search="", page_index=1, page_size=10, **kwargs):
        path = "/api/v1/account/list"
        query = {
            "search": search,
            "pageIndex": page_index,
            "pageSize": page_size
        }
        return self.get(path, query)

    def query_data(self, workspace_uuid, body, **kwargs):
        path = f"/api/v1/df/{workspace_uuid}/query_data"
        return self.post(path, body=body)

def test_get(server, access_key, secret_key):

    dfapi = DFExternalAPI(server, access_key, secret_key)
    status_code, resp = dfapi.account_list("测试")
    print(f"======={__name__}====")
    print(f"status_code: {status_code}")
    print(f"resp: {resp}")

def test_post(server, access_key, secret_key):
    dfapi = DFExternalAPI(server, access_key, secret_key, True)
    workspace_uuid = "wksp_4b57c7bab38e4a2d9630f675dc20015d"
    body = {
        "queries": [
            {
                "qtype": "dql",
                "query": {
                    "q": "L::re(`.*`):(fill(count(__docid), 0) as `count`){ (`index` IN ['default']) AND (`message` = queryString(\"观测\")) } BY `status`",
                    "funcList": [],
                    "maxPointCount": 60,
                    "interval": 20,
                    "align_time": True,
                    "sorder_by": [{"column": "`#1`", "order": "DESC"}],
                    "slimit": 20,
                    "disable_sampling": False,
                    "timeRange": [1713440394537, 1713441294537],
                    "tz": "Asia/Shanghai"
                }
            }
        ]
    }
    status_code, resp = dfapi.query_data(workspace_uuid, body)
    print(f"======={__name__}====")
    print(f"status_code: {status_code}")
    print(f"resp: {resp}")


if __name__ == '__main__':
    # external api 服务域名地址
    server = "http://127.0.0.1:5000"
    # 配置的访问者 ak
    access_key = "abcd"
    # 配置的密钥 sk
    secret_key = "Admin123"
    test_get(server, access_key, secret_key)
    test_post(server, access_key, secret_key)

go 版签名和接口请求示例

package main

import (
    "bytes"
    "crypto/hmac"
    "crypto/sha256"
    "encoding/json"
    "net/url"

    //"encoding/base64"
    "encoding/hex"
    "fmt"
    "github.com/google/uuid"
    "io/ioutil"
    "net/http"
    "strconv"
    "time"
)

type DFExternalAPI struct {
    server     string
    accessKey  string
    secretKey  string
    debug      bool
}

func Marshal(data interface{}) []byte {
    byteStr, err := json.Marshal(data)
    if err != nil {
        fmt.Println("JsonMarshal 错误:", err)
    }

    return byteStr
}

func NewDFExternalAPI(server, accessKey, secretKey string, debug bool) *DFExternalAPI {
    return &DFExternalAPI{
        server:    server,
        accessKey: accessKey,
        secretKey: secretKey,
        debug:     debug,
    }
}

func (api *DFExternalAPI) GetSign(method, fullPath string, timestamp, nonce string, body string) string {
    if body == "" {
        body = ""
    }

    stringToSign := fmt.Sprintf("%s %s %s %s %s", method, nonce, fullPath, timestamp, body)
    stringToSignBytes := []byte(stringToSign)
    //
    secretKeyBytes := []byte(api.secretKey)

    h := hmac.New(sha256.New, secretKeyBytes)
    h.Write(stringToSignBytes)
    sign := hex.EncodeToString(h.Sum(nil))

    //hmacGen := hmac.New(sha256.New, []byte(api.secretKey))
    //hmacGen.Write([]byte(stringToSign))
    //sign := base64.StdEncoding.EncodeToString(hmacGen.Sum(nil))

    return sign
}

func (api *DFExternalAPI) SetDFHeaders(req *http.Request, timestamp, nonce, sign string)  {
    // 设置请求头
    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Df-Access-Key", api.accessKey)
    req.Header.Set("X-Df-Timestamp", timestamp)
    req.Header.Set("X-Df-Nonce", nonce)
    req.Header.Set("X-Df-SVersion", "v20240417")
    req.Header.Set("X-Df-Signature", sign)
    return
}

func eTestGet(server, accessKey, secretKey string) {
    // 生成一个随机的 UUID,并将 UUID 格式化为字符串
    nonce := uuid.New().String()
    timestamp := strconv.FormatInt(time.Now().Unix(), 10)
    //fullPath := "/api/v1/account/list?search=中国&pageIndex=1&pageSize=10"
    v := url.Values{}
    v.Set("search", "中国")
    v.Set("pageIndex", "1")
    v.Set("pageSize", "10")

    fullPath := fmt.Sprintf("/api/v1/account/list?%s", v.Encode())

    api := NewDFExternalAPI(server, accessKey, secretKey, false)
    sign := api.GetSign("GET", fullPath, timestamp, nonce, "")

    url := api.server + fullPath
    // 创建一个 GET 请求
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        fmt.Println("创建请求失败:", err)
        return
    }
    api.SetDFHeaders(req, timestamp, nonce, sign)
    // 发起请求
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("请求失败:", err)
        return
    }
    defer resp.Body.Close()

    // 读取响应体
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("读取响应体失败:", err)
        return
    }

    // 打印响应体
    fmt.Println(string(body))
}

func eTestPost(server, accessKey, secretKey string) {
    method := "POST"
    // 生成一个随机的 UUID,并将 UUID 格式化为字符串
    nonce := uuid.New().String()
    timestamp := strconv.FormatInt(time.Now().Unix(), 10)
    workspaceUUID := "wksp_4b57c7bab38e4a2d9630f675dc20015d"
    fullPath := fmt.Sprintf("/api/v1/df/%s/query_data", workspaceUUID)

    api := NewDFExternalAPI(server, accessKey, secretKey, false)

    body := map[string]interface{}{
        "queries": []map[string]interface{}{
            {
                "uuid": "15d0fb90-fd81-11ee-b9b6-d19f81405637",
                "qtype": "dql",
                "query": map[string]interface{}{
                    "q":             "L::re(`.*`):(fill(count(__docid), 0) as `count`){ (`index` IN ['default']) AND (`message` = queryString(\"观测\")) } BY `status`",
                    "_funcList":     []interface{}{},
                    "funcList":      []interface{}{},
                    "maxPointCount": 60,
                    "interval":      20,
                    "align_time":    true,
                    "sorder_by": []map[string]interface{}{
                        {
                            "column": "`#1`",
                            "order":  "DESC",
                        },
                    },
                    "slimit":            20,
                    "disable_sampling":  false,
                    "timeRange":         []int64{1713440394537, 1713441294537},
                    "tz":                "Asia/Shanghai",
                },
            },
        },
    }
    bodyStr := Marshal(body)
    sign := api.GetSign(method, fullPath, timestamp, nonce, string(bodyStr))

    url := api.server + fullPath
    // 创建一个 GET 请求
    req, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyStr))
    if err != nil {
        fmt.Println("创建请求失败:", err)
        return
    }
    //// 设置请求头
    api.SetDFHeaders(req, timestamp, nonce, sign)
    // 发起请求
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("请求失败:", err)
        return
    }
    defer resp.Body.Close()
    // 读取响应体
    content, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Println("读取响应体失败:", err)
        return
    }
    // 打印响应体
    fmt.Println(string(content))
}

func main() {
    accessKey := "abcd"
    secretKey := "Admin123"
    server := "http://127.0.0.1:5000"
    eTestGet(server, accessKey, secretKey)
    eTestPost(server, accessKey, secretKey)
}

文档评价

文档内容是否对您有帮助? ×