OpenLLMetry
OpenLLMetry 是由 Traceloop 团队以 Apache 2.0 许可证开发和维护,通过扩展 OpenTelemetry 的功能,为 LLM 应用提供了专门的监控和调试工具。它利用 OpenTelemetry 的标准化遥测数据格式,将 LLM 应用的关键性能指标和追踪信息标准化输出。
配置¶
在使用 OTEL 将链路追踪数据发送到 DataKit 之前,请确保已配置 Collector,同时需要调整配置文件customer_tags = ["llm.request.type","traceloop.entity.path","llm.is_streaming","gen_ai.openai.api_base","gen_ai.prompt.1.content","gen_ai.response.model","gen_ai.completion.0.content","gen_ai.request.model","gen_ai.request.temperature","gen_ai.system","traceloop.workflow.name"]
,如下所示:
[[inputs.opentelemetry]]
## customer_tags will work as a whitelist to prevent tags send to data center.
## All . will replace to _ ,like this :
## "project.name" to send to GuanCe center is "project_name"
customer_tags = ["llm.request.type","traceloop.entity.path","llm.is_streaming","gen_ai.openai.api_base","gen_ai.prompt.1.content","gen_ai.response.model","gen_ai.completion.0.content","gen_ai.request.model","gen_ai.request.temperature","gen_ai.system","traceloop.workflow.name"]
...
调整完成后,重启 DataKit
安装 OpenTelemetry SDK¶
pip install opentelemetry-api opentelemetry-instrumentation pip install opentelemetry-instrumentation-flask
安装 OpenLLMetry SDK¶
pip install traceloop-sdk
在应用程序中初始化 OpenLLMetry¶
from traceloop.sdk import Traceloop
# 初始化 OpenLit
# Traceloop.init()
Traceloop.init(app_name="kimi_openllmetry_stream_flask")
OpenLLMetry 示例代码¶
import os
import httpx
from flask import Flask, request, Response,jsonify,stream_with_context
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow,task
from openai import OpenAI
from opentelemetry.instrumentation.flask import FlaskInstrumentor
app = Flask(__name__)
# 使用FlaskInstrumentor自动插桩Flask应用
FlaskInstrumentor().instrument_app(app)
# 初始化 OpenLit
Traceloop.init(app_name="kimi_openllmetry_stream_flask")
# 从环境变量中获取 API Key
api_key = os.getenv("MOONSHOT_API_KEY")
if not api_key:
raise ValueError("请设置 MOONSHOT_API_KEY 环境变量")
client = OpenAI(
api_key=api_key,
base_url="https://api.moonshot.cn/v1",
)
def estimate_token_count(input_messages) -> int:
"""
计算输入消息的 Tokens 数量。
"""
try:
header = {
"Authorization": f"Bearer {api_key}",
}
data = {
"model": "moonshot-v1-128k",
"messages": input_messages,
}
with httpx.Client() as client:
print("请求接口")
r = client.post("https://api.moonshot.cn/v1/tokenizers/estimate-token-count", headers=header, json=data)
r.raise_for_status()
response_data = r.json()
print(response_data["data"]["total_tokens"])
return response_data["data"]["total_tokens"]
except httpx.RequestError as e:
print(f"请求失败: {e}")
raise
except (KeyError, ValueError) as e:
print(f"解析响应失败: {e}")
raise
def select_model(input_messages, max_tokens=1024) -> str:
"""
根据输入的上下文消息和预期的 max_tokens 值选择合适的模型。
"""
if not isinstance(max_tokens, int) or max_tokens <= 0:
raise ValueError("max_tokens 必须是正整数")
prompt_tokens = estimate_token_count(input_messages)
total_tokens = prompt_tokens + max_tokens
if total_tokens <= 8 * 1024:
return "moonshot-v1-8k"
elif total_tokens <= 32 * 1024:
return "moonshot-v1-32k"
elif total_tokens <= 128 * 1024:
return "moonshot-v1-128k"
else:
raise ValueError("tokens 数量超出限制 😢")
@app.route('/ask', methods=['POST'])
@workflow(name="ask_workflow")
def ask():
data = request.json
messages = data.get('messages')
max_tokens = data.get('max_tokens', 2048)
if not messages:
return jsonify({"error": "messages 字段不能为空"}), 400
try:
model = select_model(messages, max_tokens)
completion = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
stream=True # 启用流式生成
)
def generate():
for chunk in completion:
# yield chunk.choices[0].delta.content or ''
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="")
yield delta.content or ''
return Response(stream_with_context(generate()), content_type='text/event-stream')
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True,port=5001)
配置 env,将数据通过 OpenTelemetry 上报到 Datakit
export TRACELOOP_BASE_URL=http://localhost:9529/otel
指标详解¶
指标名称 | 描述 | 单位 |
---|---|---|
gen_ai.client.generation.choices |
客户端生成的选项数量 | 个 |
gen_ai.client.operation.duration_bucket |
客户端操作的持续时间直方图桶 | 毫秒 |
gen_ai.client.operation.duration_count |
客户端操作的总次数 | 次 |
gen_ai.client.operation.duration_max |
客户端操作的最大持续时间 | 毫秒 |
gen_ai.client.operation.duration_min |
客户端操作的最小持续时间 | 毫秒 |
gen_ai.client.operation.duration_sum |
客户端操作的总持续时间 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_first_token_bucket |
OpenAI 聊天补全功能流式传输中首次生成 Token 的时间直方图桶 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_first_token_count |
OpenAI 聊天补全功能流式传输中首次生成 Token 的总次数 | 次 |
llm.openai.chat_completions.streaming_time_to_first_token_max |
OpenAI 聊天补全功能流式传输中首次生成 Token 的最大时间 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_first_token_min |
OpenAI 聊天补全功能流式传输中首次生成 Token 的最小时间 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_first_token_sum |
OpenAI 聊天补全功能流式传输中首次生成 Token 的总时间 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_generate_bucket |
OpenAI 聊天补全功能流式传输中生成内容的总时间直方图桶 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_generate_count |
OpenAI 聊天补全功能流式传输中生成内容的总次数 | 次 |
llm.openai.chat_completions.streaming_time_to_generate_max |
OpenAI 聊天补全功能流式传输中生成内容的最大时间 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_generate_min |
OpenAI 聊天补全功能流式传输中生成内容的最小时间 | 毫秒 |
llm.openai.chat_completions.streaming_time_to_generate_sum |
OpenAI 聊天补全功能流式传输中生成内容的总时间 | 毫秒 |
参考资料¶
- OpenLLMetry quickstart
- OpenLLMetry otel-collector
- OpenLLMetry github