Monitor your self-managed Apache Kafka cluster running on Kubernetes by deploying the OpenTelemetry Collector to gather and forward metrics to New Relic.
Architecture
New Relic supports two approaches for monitoring self-managed Kubernetes Kafka: the OpenTelemetry Java agent or the Prometheus JMX Exporter. The following diagrams illustrate the data flow for each approach.

Installation steps
Follow these steps to set up comprehensive Kafka monitoring by installing the OpenTelemetry Java agent on your brokers and deploying a collector to gather and send metrics and logs to New Relic.
Before you begin
Ensure you have:
- A New Relic account with a
- Kubernetes cluster with
kubectlaccess - Kafka deployed as a StatefulSet
- Ability to modify and redeploy the Kafka StatefulSet
Deploy OpenTelemetry Collector
Deploy the OpenTelemetry collector in your cluster. This step also creates the kafka-jmx-config ConfigMap that defines which JMX metrics the Java agent collects from each broker pod. The collector must be running before you restart the Kafka brokers in the next step.
Step 1. Create New Relic credentials secret
팁
For other endpoint configurations, see Configure your OTLP endpoint.
Step 2. Create values.yaml with collector configuration
Both NRDOT and OpenTelemetry collectors use identical configuration. Choose your preferred collector image:
For advanced configuration options, see:
Step 3. Install OpenTelemetry Collector with Helm
$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \> --install \> --namespace newrelic \> --create-namespace \> -f values.yamlStep 4. Verify the deployment
$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$
$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50Step 1. Create New Relic credentials secret
팁
For other endpoint configurations, see Configure your OTLP endpoint.
Step 2. Create manifest files
Both NRDOT and OpenTelemetry collectors use identical configuration. Only the container image differs. Both also require the kafka-jmx-config ConfigMap applied to your Kafka namespace.
Create kafka-jmx-config.yaml - JMX metrics configuration for the Java agent (apply to your Kafka namespace):
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-config namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-jmx-config.yaml: | --- rules: # Per-topic custom metrics - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
- bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: Number of active controllers in the cluster unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GCStep 3. Deploy the manifests
$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$
$# Apply JMX ConfigMap to the Kafka namespace$kubectl apply -f kafka-jmx-config.yaml$
$# Apply collector ConfigMap$kubectl apply -f collector-configmap.yaml$
$# Apply Deployment and Service$kubectl apply -f collector-deployment.yamlStep 4. Verify the deployment
$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$
$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app=otel-collector --tail=50Configure Kafka StatefulSet for the Java agent
Now that the collector is running, patch your Kafka StatefulSet to add an init container that downloads the OpenTelemetry Java agent JAR, then attach it to the Kafka broker JVM via KAFKA_OPTS.
Add the following sections to your existing Kafka StatefulSet manifest:
spec: template: spec: # 1. Init container: downloads OTel Java agent JAR before Kafka starts initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - | wget -O /otel-agent/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach OTel Java agent to the Kafka broker JVM env: - name: KAFKA_OPTS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.jmx.enabled=true -Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.runtime-telemetry.enabled=false -Dotel.metric.export.interval=30000 volumeMounts: - name: otel-agent mountPath: /otel-agent - name: jmx-config mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules volumes: - name: otel-agent emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-config # Deployed with the collector in the previous step팁
The kafka-jmx-config ConfigMap was deployed with the collector in the previous step. The otel.exporter.otlp.endpoint value http://otel-collector.newrelic.svc.cluster.local:4317 assumes the collector is deployed in the newrelic namespace with service name otel-collector. Update it to match your actual collector service DNS if different.
Apply your updated StatefulSet and wait for pods to roll:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace(Optional) Instrument producer or consumer applications
중요
Language support: Currently, only Java applications are supported for Kafka client instrumentation using the OpenTelemetry Java agent.
To collect application-level telemetry from your Kafka producer and consumer applications running in Kubernetes, add the OpenTelemetry Java agent to those application pods.
Add an init container and environment variables to your application's deployment:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.service.name=order-process-service -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-agent mountPath: /otel-agent
volumes: - name: otel-agent emptyDir: {}Configuration parameters
The following table describes the key configuration parameters:
Parameter | Description |
|---|---|
| Replace with a unique name for your producer or consumer application |
| Replace with the same cluster name used in your broker configuration |
| Replace with the actual DNS name of your collector service ( |
The Java agent provides out-of-the-box Kafka instrumentation with zero code changes, capturing request latencies, throughput metrics, error rates, and distributed traces. For advanced configuration, see the Kafka instrumentation documentation.
Follow these steps to set up comprehensive Kafka monitoring by installing the Prometheus JMX Exporter on your broker pods and deploying a collector to gather and send metrics to New Relic.
Before you begin
Ensure you have:
- A New Relic account with a
- Kubernetes cluster with
kubectlaccess - Kafka deployed as a StatefulSet with a headless service (for stable pod DNS names)
- Ability to modify and redeploy the Kafka StatefulSet
Create JMX metrics ConfigMap
Create a ConfigMap containing the JMX Exporter configuration that defines which Kafka metrics to collect. This ConfigMap will be mounted into each Kafka broker pod.
Save as kafka-jmx-config.yaml. Apply it to the namespace where Kafka is deployed:
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-metrics namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-metrics-config.yml: | startDelaySeconds: 0 lowercaseOutputName: true lowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"팁
Customize metrics: You can add or modify patterns by referencing the Prometheus JMX Exporter examples and Kafka MBean documentation.
Apply the ConfigMap:
$kubectl apply -f kafka-jmx-config.yamlConfigure Kafka StatefulSet for JMX Exporter
Patch your Kafka StatefulSet to add an init container that downloads the Prometheus JMX Exporter JAR, then attach it to the Kafka broker JVM via KAFKA_OPTS.
Step 1. Add the following sections to your existing Kafka StatefulSet manifest:
spec: template: spec: # 1. Init container: downloads JMX Exporter JAR before Kafka starts initContainers: - name: download-jmx-exporter image: busybox:latest command: - sh - -c - | # Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases. JMX_EXPORTER_VERSION="1.5.0" wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \ "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar" volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach JMX Exporter as Java agent on port 9404 env: - name: KAFKA_OPTS value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml" # 3. Expose port 9404 for Prometheus scraping ports: - name: jmx-metrics containerPort: 9404 protocol: TCP volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx - name: jmx-config mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config volumes: - name: prometheus-jmx emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2Step 2. Apply your updated StatefulSet and wait for pods to roll:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespaceStep 3. After the rollout completes, verify that metrics are exposed on each broker pod:
$# Replace kafka-0 and kafka with your pod name and namespace$kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20중요
Multi-broker clusters: The init container and KAFKA_OPTS configuration applies to all pods in the StatefulSet automatically. Verify each broker pod exposes metrics after the rollout.
Deploy OpenTelemetry Collector
Deploy the OpenTelemetry Collector in your cluster. The collector scrapes Kafka broker pods using static DNS targets and listens on port 4317 for OTLP data from instrumented applications.
The Helm installation method is the recommended approach for deploying OpenTelemetry Collector in Kubernetes.
Step 1. Create New Relic credentials secret
팁
For other endpoint configurations, see Configure your OTLP endpoint.
Step 2. Create values.yaml with collector configuration
Both NRDOT and OpenTelemetry collectors use identical configuration. Choose your preferred collector image:
For advanced configuration options, refer to these receiver documentation pages:
Step 3. Install OpenTelemetry Collector with Helm
$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \> --install \> --namespace newrelic \> --create-namespace \> -f values.yamlStep 4. Verify the deployment:
$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$
$# View logs to verify metrics collection$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50You should see logs indicating successful scraping from Kafka broker pods on port 9404.
The manifest installation method provides direct control over Kubernetes resources without using Helm.
Step 1. Create New Relic credentials secret
팁
For other endpoint configurations, see Configure your OTLP endpoint.
Step 2. Create manifest files
Both NRDOT and OpenTelemetry collectors use identical configuration. Only the container image differs.
For advanced configuration options, refer to these receiver documentation pages:
Step 3. Deploy the manifests
$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$
$# Apply ConfigMap$kubectl apply -f collector-configmap.yaml$
$# Apply Deployment (includes ServiceAccount)$kubectl apply -f collector-deployment.yamlStep 4. Verify the deployment:
$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$
$# View logs to verify metrics collection$kubectl logs -n newrelic -l app=otel-collector --tail=50You should see logs indicating successful scraping from Kafka broker pods on port 9404.
(Optional) Instrument producer or consumer applications
중요
Language support: Java applications support out-of-the-box Kafka client instrumentation using the OpenTelemetry Java agent.
To collect application-level telemetry from your Kafka producer and consumer applications, use the OpenTelemetry Java agent with an init container:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-java-agent image: busybox:latest command: - sh - -c - | wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar -Dotel.service.name=my-kafka-app -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
volumes: - name: otel-auto-instrumentation emptyDir: {}Configuration parameters
The following table describes the key configuration parameters:
| Parameter | Description |
|---|---|
service.name | Replace my-kafka-app with a unique name for your producer or consumer application |
kafka.cluster.name | Replace my-kafka-cluster with the same cluster name used in your collector configuration |
otlp.endpoint | The endpoint http://otel-collector.newrelic.svc.cluster.local:4317 assumes the collector is deployed in the newrelic namespace as otel-collector |
The Java agent provides out-of-the-box Kafka instrumentation with zero code changes, capturing request latencies, throughput metrics, error rates, and distributed traces. For advanced configuration, see the Kafka instrumentation documentation.
(Optional) Forward Kafka broker logs
To collect Kafka broker logs and send them to New Relic, add a filelog receiver to your collector configuration.
Find your data
After a few minutes, your Kafka data should appear in New Relic. See Find your data for detailed instructions on exploring your Kafka data across different views in the New Relic UI.
The following table summarizes where each signal type is stored. Replace my-kafka-cluster with your KAFKA_CLUSTER_NAME value in all queries below:
| Signal | Event type | What's included |
|---|---|---|
| Metrics | Metric | Broker, topic, partition, consumer group, and JVM metrics |
| Logs | Log | Logs from producer and consumer applications (via OTel Java agent) and broker logs collected via the Java agent |
| Traces | Span | Producer and consumer spans, including per-message publish and receive operations across topics |
Metrics
Broker, topic, partition, consumer group, and JVM metrics are stored in the Metric event type:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoLogs
Logs from producer and consumer applications instrumented with the OpenTelemetry Java agent, and broker logs collected via the Java agent on the broker, are stored in the Log event type:
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoTraces
Producer and consumer spans, including per-message publish and receive operations across topics, are stored in the Span event type:
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoExample
A complete working example with Kafka StatefulSet manifests, Helm values, OTel Collector configuration, and sample producer/consumer applications is available in the New Relic OpenTelemetry Examples repository.
Troubleshooting
Next steps
- Explore Kafka metrics - View the complete metrics reference
- Create custom dashboards - Build visualizations for your Kafka data
- Set up alerts - Monitor critical metrics like consumer lag and under-replicated partitions
Related resources
- Self-hosted Kafka - Kafka monitoring for self-hosted (non-Kubernetes) environments
- Kubernetes Strimzi - Kafka monitoring for Strimzi-managed Kafka on Kubernetes
- OpenTelemetry Java agent - Official documentation for the OTel Java agent
- Prometheus JMX Exporter - Java agent that exposes JMX metrics in Prometheus format
- Prometheus receiver - OTel Collector receiver for scraping Prometheus metrics endpoints
- kafkametrics receiver - Consumer lag and topic metrics receiver documentation