跳转至

Kafka 4

采集 Kafka 4 的指标数据

配置

前置条件

下载 JMX Exporter

下载地址:https://github.com/prometheus/jmx_exporter/releases/tag/1.3.0

配置 JMX 脚本和启动参数

注意:采集 Producer、Consumer、Streams、Connect 指标需要开各自独立进程,启动各自进程时注意替换对应的 yaml 文件和对应的启动脚本,如下可参考.

KRaft Metrics
  • 创建 KRaft Metrics 配置文件 kafka.yml
# ------------------------------------------------------------
# Kafka 4 Prometheus JMX Exporter Configuration
# ------------------------------------------------------------
lowercaseOutputName: false
lowercaseOutputLabelNames: true
cacheRules: true
rules:

# 1. Broker / Topic / Partition Metrics
  - pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec|MessagesInPerSec|TotalFetchRequestsPerSec|ProduceRequestsPerSec|FailedProduceRequestsPerSec|TotalProduceRequestsPerSec|ReassignmentBytesInPerSec|ReassignmentBytesOutPerSec|ProduceMessageConversionsPerSec|FetchMessageConversionsPerSec)(?:, topic=([-\.\w]*))?><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_server_broker_topic_metrics_$1
    type: GAUGE
    labels:
      topic: "$2"

# 2. Request / Network Metrics
  - pattern: kafka.network<type=RequestMetrics, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_network_request_metrics_$1
    type: GAUGE

# 3. Socket Server Metrics
  - pattern: kafka.network<type=SocketServer, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate|Value)
    name: kafka_network_socket_server_metrics_$1
    type: GAUGE

# 4. Log / Segment / Cleaner Metrics
  - pattern: kafka.log<type=LogFlushStats, name=(.+)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_log_$1_$2
    type: GAUGE

# 5. Controller (KRaft) Metrics
  - pattern: kafka.controller<type=KafkaController, name=(.+)><>(Count|Value)
    name: kafka_controller_$1
    type: GAUGE

# 6. Group / Coordinator Metrics
  - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(.+)><>(Count|Value)
    name: kafka_coordinator_group_metadata_manager_$1
    type: GAUGE

# 7. KRaft Specific Metrics
  - pattern: kafka.controller<type=KafkaController, name=(LeaderElectionSuccessRate|LeaderElectionLatencyMs)><>(Count|Value)
    name: kafka_controller_$1
    type: GAUGE

# 8. New Generation Consumer Rebalance Protocol Metrics
  - pattern: kafka.coordinator.group<type=GroupMetadataManager, name=(RebalanceTimeMs|RebalanceFrequency)><>(Count|Value)
    name: kafka_coordinator_group_metadata_manager_$1
    type: GAUGE

# 9. Queue Metrics
  - pattern: kafka.server<type=Queue, name=(QueueSize|QueueConsumerRate)><>(Count|Value)
    name: kafka_server_queue_$1
    type: GAUGE

# 10. Client Metrics
  - pattern: kafka.network<type=RequestMetrics, name=(ClientConnections|ClientRequestRate|ClientResponseTime)><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_network_request_metrics_$1
    type: GAUGE

# 11. Log Flush Rate and Time
  - pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>(Count|OneMinuteRate|FiveMinuteRate|FifteenMinuteRate|MeanRate)
    name: kafka_log_log_flush_rate_and_time_ms
    type: GAUGE
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms1g -Xmx1g"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9999 \
  -Dcom.sun.management.jmxremote.rmi.port=9999 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7071:/opt/jmx_exporter/kafka.yml"

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
Producer Metrics
  • 创建 Procucer Metrics 配置文件 producer.yml
---
lowercaseOutputName: true
rules:
  # 新增:producer-node-metrics
  - pattern: kafka\.producer<type=producer-node-metrics, client-id=([^,]+), node-id=([^>]+)><>([^:]+)
    name: kafka_producer_node_$3
    labels:
      client_id: "$1"
      node_id: "$2"
    type: GAUGE

  - pattern: 'kafka\.producer<type=producer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
    name: 'kafka_producer_metrics_$2'
    labels:
      client_id: "$1"
    type: GAUGE

  # 抓取 Selector 全部指标(Kafka 4.0 新增)
  - pattern: 'kafka\.(?:(producer|consumer|connect))<type=(producer|consumer|connect)-metrics, client-id=([^>]+)><>(connection-.+|io-.+|network-.+|select-.+|send-.+|receive-.+|reauthentication-.+)'
    name: 'kafka_${1}_${4}'
    labels:
      client_id: '$3'
    type: GAUGE
  • 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7072:/opt/jmx_exporter/producer.yml"

/opt/kafka/kafka/bin/kafka-console-producer.sh \
  --broker-list localhost:9092 \
  --topic xxxx \
  --producer-property bootstrap.servers=localhost:9092
Consumer Metrics
  • 创建 Consumer Metrics 配置文件consumer.yml
lowercaseOutputName: true
rules:
  # consumer-coordinator-metrics
  - pattern: 'kafka\.consumer<type=consumer-coordinator-metrics, client-id=([^>]+)><>([^:,\s]+).*'
    name: 'kafka_consumer_coordinator_metrics_$2'
    labels:
      client_id: "$1"
    type: GAUGE

  - pattern: 'kafka\.consumer<type=consumer-metrics, client-id=([^>]+)><>([^:,\s]+).*'
    name: 'kafka_consumer_metrics_$2'
    labels:
      client_id: "$1"
  • 启动参数
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7073:/opt/jmx_exporter/consumer.yml"

/opt/kafka/kafka/bin/kafka-console-consumer.sh \
  --broker-list localhost:9092 \
  --topic xxxx \
  --producer-property bootstrap.servers=localhost:9092
Streams Metrics
  • 创建 Streams Metrics 配置文件stream.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true

rules:
  # Kafka Streams 应用指标 - 移除特殊字符
  - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+)$'
    name: kafka_streams_$2
    labels:
      client_id: "$1"

  # 处理包含特殊字符的属性名
  - pattern: 'kafka.streams<type=stream-metrics, client-id=(.+)><>([a-zA-Z0-9\-]+):(.+)$'
    name: kafka_streams_$2_$3
    labels:
      client_id: "$1"

  # Processor Node 指标
  - pattern: 'kafka.streams<type=stream-processor-node-metrics, client-id=(.+), task-id=(.+), processor-node-id=(.+)><>(.+)'
    name: kafka_streams_processor_$4
    labels:
      client_id: "$1"
      task_id: "$2"
      processor_node_id: "$3"

  # Task 指标
  - pattern: 'kafka.streams<type=stream-task-metrics, client-id=(.+), task-id=(.+)><>(.+)'
    name: kafka_streams_task_$3
    labels:
      client_id: "$1"
      task_id: "$2"

  # 线程指标
  - pattern: 'kafka.streams<type=stream-thread-metrics, client-id=(.+), thread-id=(.+)><>(.+)'
    name: kafka_streams_thread_$3
    labels:
      client_id: "$1"
      thread_id: "$2"

  # JVM 指标
  - pattern: 'java.lang<type=Memory><>(.+)'
    name: jvm_memory_$1

  - pattern: 'java.lang<type=GarbageCollector, name=(.+)><>(\w+)'
    name: jvm_gc_$2
    labels:
      gc: "$1"

  # 线程池指标
  - pattern: 'java.lang<type=Threading><>(.+)'
    name: jvm_threads_$1

  # 默认规则
  - pattern: '(.*)'
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.port=9996 \
  -Dcom.sun.management.jmxremote.rmi.port=9996 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7075:/opt/jmx_exporter/stream.yml"

java $KAFKA_HEAP_OPTS $KAFKA_JMX_OPTS $EXTRA_ARGS -cp "libs/*:my-streams.jar" WordCountDemo
Connect Metrics
  • 创建 Connect Metrics 配置文件connect.yml
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
  # 1) connect-worker-metrics(全局)
  - pattern: 'kafka\.connect<type=connect-worker-metrics><>([^:]+)'
    name: 'kafka_connect_worker_$1'
    type: GAUGE

  # 2) connect-worker-metrics,connector=xxx
  - pattern: 'kafka\.connect<type=connect-worker-metrics, connector=([^>]+)><>([^:]+)'
    name: 'kafka_connect_worker_$2'
    labels:
      connector: "$1"
    type: GAUGE

  # 3) connect-worker-rebalance-metrics
  - pattern: 'kafka\.connect<type=connect-worker-rebalance-metrics><>([^:]+)'
    name: 'kafka_connect_worker_rebalance_$1'
    type: GAUGE

  # 4) connector-task-metrics
  - pattern: 'kafka\.connect<type=connector-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
    name: 'kafka_connect_task_$3'
    labels:
      connector: "$1"
      task_id: "$2"
    type: GAUGE

  # 5) sink-task-metrics
  - pattern: 'kafka\.connect<type=sink-task-metrics, connector=([^>]+), task=([^>]+)><>([^:]+)'
    name: 'kafka_connect_sink_task_$3'
    labels:
      connector: "$1"
      task_id: "$2"
  • 启动参数
export KAFKA_HEAP_OPTS="-Xms512m -Xmx512m"
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Dcom.sun.management.jmxremote.port=9995 \
  -Dcom.sun.management.jmxremote.rmi.port=9995 \
  -Djava.rmi.server.hostname=127.0.0.1"
export Environment="KAFKA_OPTS=-javaagent:/opt/jmx_exporter/jmx_prometheus_javaagent.jar=7074:/opt/jmx_exporter/connect.yml"

# 启动 Kafka Connect
/opt/kafka/kafka/bin/connect-distributed.sh /opt/kafka/kafka/config/connect-distributed.properties

启动成功后,可通过 curl http://IP:端口号/metrics 查看获取到的监控数据.

配置 DataKit

  • 进入 datakit 安装目录下的 conf.d/samples 目录,复制 prom.conf.sample 并命名为 kafka.conf

cp prom.conf.sample kafka.conf

  • 调整 kafka.conf
[[inputs.prom]]
  ## Exporter URLs.
  urls = ["http://127.0.0.1:7071/metrics","http://127.0.0.1:7072/metrics","http://127.0.0.1:7073/metrics","http://127.0.0.1:7074/metrics","http://127.0.0.1:7075/metrics"]

  ## Collector alias.
  source = "kafka"

  ## Prioritier over 'measurement_name' configuration.
  [[inputs.prom.measurements]]
    prefix = "kafka_controller_"
    name = "kafka_controller"

  [[inputs.prom.measurements]]
    prefix = "kafka_network_"
    name = "kafka_network"

  [[inputs.prom.measurements]]
    prefix = "kafka_log_"
    name = "kafka_log"

  [[inputs.prom.measurements]]
    prefix = "kafka_server_"
    name = "kafka_server"

  [[inputs.prom.measurements]]
    prefix = "kafka_connect_"
    name = "kafka_connect"

  [[inputs.prom.measurements]]
    prefix = "kafka_stream_"
    name = "kafka_stream"
  • 重启 DataKit

执行以下命令

datakit service -R

指标

以下是 kafka4 部分指标说明,更多指标可参考 Kafka 指标详情

kafka_server指标集

指标名 描述 单位
Fetch_queue_size Fetch 对列大小 count
Produce_queue_size Producer 对列大小 count
Request_queue_size Request 对列大小 count
broker_topic_metrics_BytesInPerSec 客户端字节输入速率 bytes/s
broker_topic_metrics_BytesOutPerSec 客户端字节输出速率 bytes/s
broker_topic_metrics_FailedProduceRequestsPerSec 生产请求失败率 count/s
broker_topic_metrics_FetchMessageConversionsPerSec Fetch 消息转换速率 count/s
broker_topic_metrics_MessagesInPerSec 传入消息速率 count/s
broker_topic_metrics_ProduceMessageConversionsPerSec Producer 消息转换速率 count/s
broker_topic_metrics_TotalFetchRequestsPerSec 获取请求(来自客户端或关注者)速率 count/s
broker_topic_metrics_TotalProduceRequestsPerSec 生产者请求速率 count/s
socket_server_metrics_connection_count SocketServer 连接数量 count
socket_server_metrics_connection_close_total SocketServer 关闭的连接数量 count
socket_server_metrics_incoming_byte_rate SocketServer 输入字节速率 bytes/s

kafka_network指标集

指标名 描述 单位
request_metrics_RequestBytes_request_AddOffsetsToTxn AddOffsetsToTxn 请求大小 bytes
request_metrics_RequestBytes_request_Fetch Fetch 请求大小 count
request_metrics_RequestBytes_request_FetchConsumer FetchConsumer 请求大小 bytes
request_metrics_RequestBytes_request_FetchFollower FetchFollower 请求大小 bytes
request_metrics_TotalTimeMs_request_CreateTopics CreateTopics 请求总时间 ms
request_metrics_TotalTimeMs_request_CreatePartitions CreatePartitions 请求总时间 ms
request_metrics_RequestQueueTimeMs_request_CreateTopics CreateTopics 在请求对列等待时间 ms
request_metrics_RequestQueueTimeMs_request_CreatePartitions CreatePartitions 在请求对列等待时间 ms
request_metrics_RequestQueueTimeMs_request_Produce Produce 在请求对列的等待时间 ms
request_metrics_ResponseSendTimeMs_request_CreateTopics CreateTopics 请求收到响应时间 ms
request_metrics_ResponseSendTimeMs_request_CreatePartitions CreatePartitions 请求收到响应时间 ms

kafka_controller指标集

指标名 描述 单位
ActiveBrokerCount 活跃的 Borker 数量 count
ActiveControllerCount 活跃的控制器数量 count
GlobalPartitionCount 分区数量 count
GlobalTopicCount 主题数量 count
OfflinePartitionsCount 离线分区数量 count
PreferredReplicaImbalanceCount Preferred Leader 选举条件的分区数 count
OfflinePartitionsCount 离线分区数量 count
TimedOutBrokerHeartbeatCount Broker 心跳超时的次数 count
LastAppliedRecordLagMs 最后应用的记录滞后时间 ms
LastAppliedRecordOffset 最后应用的记录偏移量 -
MetadataErrorCount 元数据错误计数 count
NewActiveControllersCount 新控制器选举次数 count

kafka_producer指标集

指标名 描述 单位
producer_metrics_batch_split_rate 批次分割率 count/s
producer_metrics_buffer_available_bytes 未使用的缓冲区内存总量 bytes
producer_metrics_buffer_exhausted_rate 缓冲区耗尽而丢弃的每秒平均记录发送数量 count/s
producer_metrics_buffer_total_bytes 缓冲区总字节大小 bytes
producer_metrics_bufferpool_wait_ratio 缓冲池等待比率 %
producer_metrics_bufferpool_wait_time_ns_total 缓冲池等待时间 ms
producer_metrics_connection_close_rate 关闭连接率 count/s
producer_metrics_connection_count 关闭连接数量 count
producer_metrics_flush_time_ns_total 刷新总时间 ns
producer_metrics_incoming_byte_rate 输入字节率 bytes/s
producer_metrics_outgoing_byte_rate 输出字节率 bytes/s
producer_metrics_request_rate 请求率 count/s
producer_metrics_request_size_avg 请求大小 bytes

kafka_consumer指标集

指标名 描述 单位
consumer_coordinator_metrics_failed_rebalance_total 再平衡失败数量 count
consumer_coordinator_metrics_heartbeat_rate 每秒平均心跳次数 count/s
consumer_coordinator_metrics_heartbeat_response_time_max 心跳响应最大时间 count
consumer_coordinator_metrics_join_rate Group 每秒加入速率 count/s
consumer_coordinator_metrics_join_total Group 加入总数 count
consumer_coordinator_metrics_last_rebalance_seconds_ago 自上次重新平衡事件以来的秒数 ms
consumer_coordinator_metrics_rebalance_latency_total 重新平衡延迟总计 ms
consumer_fetch_manager_metrics_bytes_consumed_rate 每秒消耗的字节数 bytes/s
consumer_fetch_manager_metrics_fetch_latency_avg Fetch 请求延迟 ms
consumer_metrics_connection_count 连接数 count
consumer_metrics_connection_count 关闭连接数 count/s
consumer_metrics_incoming_byte_rate 输入字节数率 bytes/s
consumer_metrics_outgoing_byte_rate 输出字节速率 bytes/s
consumer_metrics_select_rate Select 速率 count/s
consumer_metrics_last_poll_seconds_ago IO 等待时间 ms
consumer_metrics_last_poll_seconds_ago IO 等待时间 ms

kafka_connect指标集

指标名 描述 单位
worker_connector_count Connector 数量 count
worker_task_startup_attempts_total 任务启动重试次数 count
worker_connector_startup_attempts_total 连接器尝试启动次数 count
worker_task_startup_failure_total 任务启动失败数量 count
worker_connector_startup_failure_percentage 连接失败率 %
worker_rebalance_completed_rebalances_total 再平衡完成总数 count
worker_task_startup_failure_percentage 任务启动失败占比 %
worker_rebalance_time_since_last_rebalance_ms 自上次重新平衡以来的时间 ms
worker_task_startup_attempts_total 任务尝试启动次数 count

kafka_stream指标集

指标名 描述 单位
stream_thread_metrics_thread_start_time 线程启动时间 时间戳 ms
stream_thread_metrics_task_created_total 任务创建总数 count
stream_state_metrics_block_cache_capacity 块缓存大小 bytes
stream_state_metrics_all_rate 所有操作率 count/s
stream_state_metrics_block_cache_usage 块缓存使用率 %
stream_state_metrics_bytes_read_compaction_rate 字节读取压缩率 bytes/s
stream_state_metrics_bytes_written_compaction_rate 字节写入压缩率 bytes/s
stream_state_metrics_block_cache_index_hit_ratio 块缓存索引命中率 %
stream_state_metrics_block_cache_data_hit_ratio 块缓存数据命中率 %
stream_state_metrics_block_cache_filter_hit_ratio 块缓存过滤器命中率 %
stream_state_metrics_bytes_written_rate 字节写入速率 bytes/s
stream_state_metrics_bytes_read_rate 字节读取速率 bytes/s
stream_state_metrics_block_cache_filter_hit_ratio 缓存大小字节数 bytes
stream_task_metrics_process_rate 每秒处理的记录数 bytes/s
stream_task_metrics_enforced_processing_rate 每秒强制处理数 bytes/s
stream_task_metrics_active_process_ratio 活动进程比率 %
stream_thread_metrics_commit_rate 提交率 count/s
stream_thread_metrics_poll_latency_avg 轮询延迟时间 ms
stream_thread_metrics_poll_rate 轮询速率 count/s
stream_thread_metrics_blocked_time_ns_total 阻塞时间 ns
stream_topic_metrics_bytes_consumed_total 消费字节数 bytes
stream_topic_metrics_bytes_produced_total 生产字节数 bytes
stream_topic_metrics_records_consumed_total 源处理器节点消耗的记录总数 count
stream_topic_metrics_records_produced_total 接收器处理器节点生成的记录总数 count

文档评价

文档内容是否对您有帮助? ×