跳转至

OpenLLMetry

OpenLLMetry 是由 Traceloop 团队以 Apache 2.0 许可证开发和维护,通过扩展 OpenTelemetry 的功能,为 LLM 应用提供了专门的监控和调试工具。它利用 OpenTelemetry 的标准化遥测数据格式,将 LLM 应用的关键性能指标和追踪信息标准化输出。

配置

在使用 OTEL 将链路追踪数据发送到 DataKit 之前,请确保已配置 Collector,同时需要调整配置文件customer_tags = ["llm.request.type","traceloop.entity.path","llm.is_streaming","gen_ai.openai.api_base","gen_ai.prompt.1.content","gen_ai.response.model","gen_ai.completion.0.content","gen_ai.request.model","gen_ai.request.temperature","gen_ai.system","traceloop.workflow.name"],如下所示:

[[inputs.opentelemetry]]
  ## customer_tags will work as a whitelist to prevent tags send to data center.
  ## All . will replace to _ ,like this :
  ## "project.name" to send to GuanCe center is "project_name"
    customer_tags = ["llm.request.type","traceloop.entity.path","llm.is_streaming","gen_ai.openai.api_base","gen_ai.prompt.1.content","gen_ai.response.model","gen_ai.completion.0.content","gen_ai.request.model","gen_ai.request.temperature","gen_ai.system","traceloop.workflow.name"]
  ...

调整完成后,重启 DataKit

安装 OpenTelemetry SDK

pip install opentelemetry-api opentelemetry-instrumentation pip install opentelemetry-instrumentation-flask

安装 OpenLLMetry SDK

pip install traceloop-sdk

在应用程序中初始化 OpenLLMetry

from traceloop.sdk import Traceloop

# 初始化 OpenLit
# Traceloop.init()

Traceloop.init(app_name="kimi_openllmetry_stream_flask")

OpenLLMetry 示例代码

import os
import httpx
from flask import Flask, request, Response,jsonify,stream_with_context
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow,task
from openai import OpenAI

from opentelemetry.instrumentation.flask import FlaskInstrumentor


app = Flask(__name__)
# 使用FlaskInstrumentor自动插桩Flask应用
FlaskInstrumentor().instrument_app(app)

# 初始化 OpenLit
Traceloop.init(app_name="kimi_openllmetry_stream_flask")

# 从环境变量中获取 API Key
api_key = os.getenv("MOONSHOT_API_KEY")
if not api_key:
    raise ValueError("请设置 MOONSHOT_API_KEY 环境变量")

client = OpenAI(
    api_key=api_key,
    base_url="https://api.moonshot.cn/v1",
)

def estimate_token_count(input_messages) -> int:
    """
    计算输入消息的 Tokens 数量。
    """
    try:
        header = {
            "Authorization": f"Bearer {api_key}",
        }
        data = {
            "model": "moonshot-v1-128k",
            "messages": input_messages,
        }
        with httpx.Client() as client:
            print("请求接口")
            r = client.post("https://api.moonshot.cn/v1/tokenizers/estimate-token-count", headers=header, json=data)
            r.raise_for_status()
            response_data = r.json()
            print(response_data["data"]["total_tokens"])
            return response_data["data"]["total_tokens"]
    except httpx.RequestError as e:
        print(f"请求失败: {e}")
        raise
    except (KeyError, ValueError) as e:
        print(f"解析响应失败: {e}")
        raise

def select_model(input_messages, max_tokens=1024) -> str:
    """
    根据输入的上下文消息和预期的 max_tokens 值选择合适的模型。
    """
    if not isinstance(max_tokens, int) or max_tokens <= 0:
        raise ValueError("max_tokens 必须是正整数")

    prompt_tokens = estimate_token_count(input_messages)
    total_tokens = prompt_tokens + max_tokens

    if total_tokens <= 8 * 1024:
        return "moonshot-v1-8k"
    elif total_tokens <= 32 * 1024:
        return "moonshot-v1-32k"
    elif total_tokens <= 128 * 1024:
        return "moonshot-v1-128k"
    else:
        raise ValueError("tokens 数量超出限制 😢")

@app.route('/ask', methods=['POST'])
@workflow(name="ask_workflow")
def ask():
    data = request.json
    messages = data.get('messages')
    max_tokens = data.get('max_tokens', 2048)

    if not messages:
        return jsonify({"error": "messages 字段不能为空"}), 400

    try:
        model = select_model(messages, max_tokens)

        completion = client.chat.completions.create(
            model=model,
            messages=messages,
            max_tokens=max_tokens,
            temperature=0.3,
            stream=True  # 启用流式生成
        )

        def generate():
            for chunk in completion:
                # yield chunk.choices[0].delta.content or ''
                delta = chunk.choices[0].delta
                if delta.content:
                    print(delta.content, end="")
                    yield delta.content or ''

        return Response(stream_with_context(generate()), content_type='text/event-stream')
    except Exception as e:
        return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
    app.run(debug=True,port=5001)

配置 env,将数据通过 OpenTelemetry 上报到 Datakit

export TRACELOOP_BASE_URL=http://localhost:9529/otel

指标详解

指标名称 描述 单位
gen_ai.client.generation.choices 客户端生成的选项数量
gen_ai.client.operation.duration_bucket 客户端操作的持续时间直方图桶 毫秒
gen_ai.client.operation.duration_count 客户端操作的总次数
gen_ai.client.operation.duration_max 客户端操作的最大持续时间 毫秒
gen_ai.client.operation.duration_min 客户端操作的最小持续时间 毫秒
gen_ai.client.operation.duration_sum 客户端操作的总持续时间 毫秒
llm.openai.chat_completions.streaming_time_to_first_token_bucket OpenAI 聊天补全功能流式传输中首次生成 Token 的时间直方图桶 毫秒
llm.openai.chat_completions.streaming_time_to_first_token_count OpenAI 聊天补全功能流式传输中首次生成 Token 的总次数
llm.openai.chat_completions.streaming_time_to_first_token_max OpenAI 聊天补全功能流式传输中首次生成 Token 的最大时间 毫秒
llm.openai.chat_completions.streaming_time_to_first_token_min OpenAI 聊天补全功能流式传输中首次生成 Token 的最小时间 毫秒
llm.openai.chat_completions.streaming_time_to_first_token_sum OpenAI 聊天补全功能流式传输中首次生成 Token 的总时间 毫秒
llm.openai.chat_completions.streaming_time_to_generate_bucket OpenAI 聊天补全功能流式传输中生成内容的总时间直方图桶 毫秒
llm.openai.chat_completions.streaming_time_to_generate_count OpenAI 聊天补全功能流式传输中生成内容的总次数
llm.openai.chat_completions.streaming_time_to_generate_max OpenAI 聊天补全功能流式传输中生成内容的最大时间 毫秒
llm.openai.chat_completions.streaming_time_to_generate_min OpenAI 聊天补全功能流式传输中生成内容的最小时间 毫秒
llm.openai.chat_completions.streaming_time_to_generate_sum OpenAI 聊天补全功能流式传输中生成内容的总时间 毫秒

参考资料

文档评价

文档内容是否对您有帮助? ×