Kafka 4
采集 Kafka 4 的指标数据
配置¶
前置条件¶
下载 JMX Exporter¶
下载地址:https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0
配置 JMX 脚本和启动参数¶
注意:采集 Producer、Consumer、Streams、Connect 指标需要开各自独立进程,启动各自进程时注意替换对应的 yaml 文件和对应的启动脚本,如下可参考.
KRaft Metrics¶
- 创建 KRaft Metrics 配置文件
kafka.yml
# ------------------------------------------------------------
# Kafka 4 Prometheus JMX Exporter Configuration
# ------------------------------------------------------------
lowercaseOutputName: false
lowercaseOutputLabelNames: true
cacheRules: true
rules:
# 1. Broker / Topic / Partition Metrics
- pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_server_broker_topic_metrics_$1
type: GAUGE
labels:
topic: "$2"
# 2. Request / Network Metrics
- pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_network_request_metrics_$1
type: GAUGE
# 3. Socket Server Metrics
- pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)
name: kafka_network_socket_server_metrics_$1
type: GAUGE
# 4. Log / Segment / Cleaner Metrics
- pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_log_$1_$2
type: GAUGE
# 5. Controller (KRaft) Metrics
- pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)
name: kafka_controller_$1
type: GAUGE
# 6. Group / Coordinator Metrics
- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)
name: kafka_coordinator_group_metadata_manager_$1
type: GAUGE
# 7. KRaft Specific Metrics
- pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)
name: kafka_controller_$1
type: GAUGE
# 8. New Generation Consumer Rebalance Protocol Metrics
- pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)
name: kafka_coordinator_group_metadata_manager_$1
type: GAUGE
# 9. Queue Metrics
- pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)
name: kafka_server_queue_$1
type: GAUGE
# 10. Client Metrics
- pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_network_request_metrics_$1
type: GAUGE
# 11. Log Flush Rate and Time
- pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
name: kafka_log_log_flush_rate_and_time_ms
type: GAUGE
- 启动参数
export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.rmi.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
Producer Metrics¶
- 创建 Procucer Metrics 配置文件
producer.yml
---
lowercaseOutputName: true
rules:
# 新增:producer-node-metrics
- pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)
name: kafka_producer_node_$3
labels:
client_id: "$1"
node_id: "$2"
type: GAUGE
- pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
name: 'kafka_producer_metrics_$2'
labels:
client_id: "$1"
type: GAUGE
# 抓取 Selector 全部指标(Kafka 4.0 新增)
- pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'
name: 'kafka_${1}_${4}'
labels:
client_id: '$3'
type: GAUGE
- 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"
/opt/kafka/kafka/bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic xxxx \
--producer-property bootstrap.servers=localhost:9092
Consumer Metrics¶
- 创建 Consumer Metrics 配置文件
consumer.yml
lowercaseOutputName: true
rules:
# consumer-coordinator-metrics
- pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'
name: 'kafka_consumer_coordinator_metrics_$2'
labels:
client_id: "$1"
type: GAUGE
- pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
name: 'kafka_consumer_metrics_$2'
labels:
client_id: "$1"
- 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"
/opt/kafka/kafka/bin/kafka-console-consumer.sh \
--broker-list localhost:9092 \
--topic xxxx \
--producer-property bootstrap.servers=localhost:9092
Streams Metrics¶
- 创建 Streams Metrics 配置文件
stream.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Kafka Streams 应用指标 - 移除特殊字符
- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'
name: kafka_streams_$2
labels:
client_id: "$1"
# 处理包含特殊字符的属性名
- pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'
name: kafka_streams_$2_$3
labels:
client_id: "$1"
# Processor Node 指标
- pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'
name: kafka_streams_processor_$4
labels:
client_id: "$1"
task_id: "$2"
processor_node_id: "$3"
# Task 指标
- pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'
name: kafka_streams_task_$3
labels:
client_id: "$1"
task_id: "$2"
# 线程指标
- pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'
name: kafka_streams_thread_$3
labels:
client_id: "$1"
thread_id: "$2"
# JVM 指标
- pattern: 'java.lang<type=Memory><>(.+)'
name: jvm_memory_$1
- pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'
name: jvm_gc_$2
labels:
gc: "$1"
# 线程池指标
- pattern: 'java.lang<type=Threading><>(.+)'
name: jvm_threads_$1
# 默认规则
- pattern: '(.*)'
- 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \
-Dcom.sun.management.jmxremote.rmi.port=9996 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"
java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo
Connect Metrics¶
- 创建 Connect Metrics 配置文件
connect.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# 1) connect-worker-metrics(全局)
- pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'
name: 'kafka_connect_worker_$1'
type: GAUGE
# 2) connect-worker-metrics,connector=xxx
- pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'
name: 'kafka_connect_worker_$2'
labels:
connector: "$1"
type: GAUGE
# 3) connect-worker-rebalance-metrics
- pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'
name: 'kafka_connect_worker_rebalance_$1'
type: GAUGE
# 4) connector-task-metrics
- pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
name: 'kafka_connect_task_$3'
labels:
connector: "$1"
task_id: "$2"
type: GAUGE
# 5) sink-task-metrics
- pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
name: 'kafka_connect_sink_task_$3'
labels:
connector: "$1"
task_id: "$2"
- 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.port=9995 \
-Dcom.sun.management.jmxremote.rmi.port=9995 \
-Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"
# 启动 Kafka Connect
/opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties
启动成功后,可通过 curl http://IP:端口号/metrics
查看获取到的监控数据.
配置 DataKit¶
- 进入 datakit 安装目录下的
conf.d/samples
目录,复制prom.conf.sample
并命名为kafka.conf
cp prom.conf.sample kafka.conf
- 调整
kafka.conf
[[inputs.prom]]
## Exporter URLs.
urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]
## Collector alias.
source = "kafka"
## Prioritier over 'measurement_name' configuration.
[[inputs.prom.measurements]]
prefix = "kafka_controller_"
name = "kafka_controller"
[[inputs.prom.measurements]]
prefix = "kafka_network_"
name = "kafka_network"
[[inputs.prom.measurements]]
prefix = "kafka_log_"
name = "kafka_log"
[[inputs.prom.measurements]]
prefix = "kafka_server_"
name = "kafka_server"
[[inputs.prom.measurements]]
prefix = "kafka_connect_"
name = "kafka_connect"
[[inputs.prom.measurements]]
prefix = "kafka_stream_"
name = "kafka_stream"
- 重启 DataKit
执行以下命令
指标¶
以下是 kafka4 部分指标说明,更多指标可参考 Kafka 指标详情
kafka_server
指标集¶
指标名 | 描述 | 单位 |
---|---|---|
Fetch_queue_size |
Fetch 对列大小 | count |
Produce_queue_size |
Producer 对列大小 | count |
Request_queue_size |
Request 对列大小 | count |
broker_topic_metrics_BytesInPerSec |
客户端字节输入速率 | bytes/s |
broker_topic_metrics_BytesOutPerSec |
客户端字节输出速率 | bytes/s |
broker_topic_metrics_FailedProduceRequestsPerSec |
生产请求失败率 | count/s |
broker_topic_metrics_FetchMessageConversionsPerSec |
Fetch 消息转换速率 | count/s |
broker_topic_metrics_MessagesInPerSec |
传入消息速率 | count/s |
broker_topic_metrics_ProduceMessageConversionsPerSec |
Producer 消息转换速率 | count/s |
broker_topic_metrics_TotalFetchRequestsPerSec |
获取请求(来自客户端或关注者)速率 | count/s |
broker_topic_metrics_TotalProduceRequestsPerSec |
生产者请求速率 | count/s |
socket_server_metrics_connection_count |
SocketServer 连接数量 | count |
socket_server_metrics_connection_close_total |
SocketServer 关闭的连接数量 | count |
socket_server_metrics_incoming_byte_rate |
SocketServer 输入字节速率 | bytes/s |
kafka_network
指标集¶
指标名 | 描述 | 单位 |
---|---|---|
request_metrics_RequestBytes_request_AddOffsetsToTxn |
AddOffsetsToTxn 请求大小 | bytes |
request_metrics_RequestBytes_request_Fetch |
Fetch 请求大小 | count |
request_metrics_RequestBytes_request_FetchConsumer |
FetchConsumer 请求大小 | bytes |
request_metrics_RequestBytes_request_FetchFollower |
FetchFollower 请求大小 | bytes |
request_metrics_TotalTimeMs_request_CreateTopics |
CreateTopics 请求总时间 | ms |
request_metrics_TotalTimeMs_request_CreatePartitions |
CreatePartitions 请求总时间 | ms |
request_metrics_RequestQueueTimeMs_request_CreateTopics |
CreateTopics 在请求对列等待时间 | ms |
request_metrics_RequestQueueTimeMs_request_CreatePartitions |
CreatePartitions 在请求对列等待时间 | ms |
request_metrics_RequestQueueTimeMs_request_Produce |
Produce 在请求对列的等待时间 | ms |
request_metrics_ResponseSendTimeMs_request_CreateTopics |
CreateTopics 请求收到响应时间 | ms |
request_metrics_ResponseSendTimeMs_request_CreatePartitions |
CreatePartitions 请求收到响应时间 | ms |
kafka_controller
指标集¶
指标名 | 描述 | 单位 |
---|---|---|
ActiveBrokerCount |
活跃的 Borker 数量 | count |
ActiveControllerCount |
活跃的控制器数量 | count |
GlobalPartitionCount |
分区数量 | count |
GlobalTopicCount |
主题数量 | count |
OfflinePartitionsCount |
离线分区数量 | count |
PreferredReplicaImbalanceCount |
Preferred Leader 选举条件的分区数 | count |
OfflinePartitionsCount |
离线分区数量 | count |
TimedOutBrokerHeartbeatCount |
Broker 心跳超时的次数 | count |
LastAppliedRecordLagMs |
最后应用的记录滞后时间 | ms |
LastAppliedRecordOffset |
最后应用的记录偏移量 | - |
MetadataErrorCount |
元数据错误计数 | count |
NewActiveControllersCount |
新控制器选举次数 | count |
kafka_producer
指标集¶
指标名 | 描述 | 单位 |
---|---|---|
producer_metrics_batch_split_rate |
批次分割率 | count/s |
producer_metrics_buffer_available_bytes |
未使用的缓冲区内存总量 | bytes |
producer_metrics_buffer_exhausted_rate |
缓冲区耗尽而丢弃的每秒平均记录发送数量 | count/s |
producer_metrics_buffer_total_bytes |
缓冲区总字节大小 | bytes |
producer_metrics_bufferpool_wait_ratio |
缓冲池等待比率 | % |
producer_metrics_bufferpool_wait_time_ns_total |
缓冲池等待时间 | ms |
producer_metrics_connection_close_rate |
关闭连接率 | count/s |
producer_metrics_connection_count |
关闭连接数量 | count |
producer_metrics_flush_time_ns_total |
刷新总时间 | ns |
producer_metrics_incoming_byte_rate |
输入字节率 | bytes/s |
producer_metrics_outgoing_byte_rate |
输出字节率 | bytes/s |
producer_metrics_request_rate |
请求率 | count/s |
producer_metrics_request_size_avg |
请求大小 | bytes |
kafka_consumer
指标集¶
指标名 | 描述 | 单位 |
---|---|---|
consumer_coordinator_metrics_failed_rebalance_total |
再平衡失败数量 | count |
consumer_coordinator_metrics_heartbeat_rate |
每秒平均心跳次数 | count/s |
consumer_coordinator_metrics_heartbeat_response_time_max |
心跳响应最大时间 | count |
consumer_coordinator_metrics_join_rate |
Group 每秒加入速率 | count/s |
consumer_coordinator_metrics_join_total |
Group 加入总数 | count |
consumer_coordinator_metrics_last_rebalance_seconds_ago |
自上次重新平衡事件以来的秒数 | ms |
consumer_coordinator_metrics_rebalance_latency_total |
重新平衡延迟总计 | ms |
consumer_fetch_manager_metrics_bytes_consumed_rate |
每秒消耗的字节数 | bytes/s |
consumer_fetch_manager_metrics_fetch_latency_avg |
Fetch 请求延迟 | ms |
consumer_metrics_connection_count |
连接数 | count |
consumer_metrics_connection_count |
关闭连接数 | count/s |
consumer_metrics_incoming_byte_rate |
输入字节数率 | bytes/s |
consumer_metrics_outgoing_byte_rate |
输出字节速率 | bytes/s |
consumer_metrics_select_rate |
Select 速率 | count/s |
consumer_metrics_last_poll_seconds_ago |
IO 等待时间 | ms |
consumer_metrics_last_poll_seconds_ago |
IO 等待时间 | ms |
kafka_connect
指标集¶
指标名 | 描述 | 单位 |
---|---|---|
worker_connector_count |
Connector 数量 | count |
worker_task_startup_attempts_total |
任务启动重试次数 | count |
worker_connector_startup_attempts_total |
连接器尝试启动次数 | count |
worker_task_startup_failure_total |
任务启动失败数量 | count |
worker_connector_startup_failure_percentage |
连接失败率 | % |
worker_rebalance_completed_rebalances_total |
再平衡完成总数 | count |
worker_task_startup_failure_percentage |
任务启动失败占比 | % |
worker_rebalance_time_since_last_rebalance_ms |
自上次重新平衡以来的时间 | ms |
worker_task_startup_attempts_total |
任务尝试启动次数 | count |
kafka_stream
指标集¶
指标名 | 描述 | 单位 |
---|---|---|
stream_thread_metrics_thread_start_time |
线程启动时间 | 时间戳 ms |
stream_thread_metrics_task_created_total |
任务创建总数 | count |
stream_state_metrics_block_cache_capacity |
块缓存大小 | bytes |
stream_state_metrics_all_rate |
所有操作率 | count/s |
stream_state_metrics_block_cache_usage |
块缓存使用率 | % |
stream_state_metrics_bytes_read_compaction_rate |
字节读取压缩率 | bytes/s |
stream_state_metrics_bytes_written_compaction_rate |
字节写入压缩率 | bytes/s |
stream_state_metrics_block_cache_index_hit_ratio |
块缓存索引命中率 | % |
stream_state_metrics_block_cache_data_hit_ratio |
块缓存数据命中率 | % |
stream_state_metrics_block_cache_filter_hit_ratio |
块缓存过滤器命中率 | % |
stream_state_metrics_bytes_written_rate |
字节写入速率 | bytes/s |
stream_state_metrics_bytes_read_rate |
字节读取速率 | bytes/s |
stream_state_metrics_block_cache_filter_hit_ratio |
缓存大小字节数 | bytes |
stream_task_metrics_process_rate |
每秒处理的记录数 | bytes/s |
stream_task_metrics_enforced_processing_rate |
每秒强制处理数 | bytes/s |
stream_task_metrics_active_process_ratio |
活动进程比率 | % |
stream_thread_metrics_commit_rate |
提交率 | count/s |
stream_thread_metrics_poll_latency_avg |
轮询延迟时间 | ms |
stream_thread_metrics_poll_rate |
轮询速率 | count/s |
stream_thread_metrics_blocked_time_ns_total |
阻塞时间 | ns |
stream_topic_metrics_bytes_consumed_total |
消费字节数 | bytes |
stream_topic_metrics_bytes_produced_total |
生产字节数 | bytes |
stream_topic_metrics_records_consumed_total |
源处理器节点消耗的记录总数 | count |
stream_topic_metrics_records_produced_total |
接收器处理器节点生成的记录总数 | count |