Monitor your Kafka cluster running on Kubernetes with Strimzi operator by deploying the OpenTelemetry Collector. The collector automatically discovers Kafka broker pods and collects comprehensive metrics.
Before you begin
Ensure you have:
- A New Relic account with a
- Kubernetes cluster with kubectl access
- Kafka deployed via Strimzi operator with JMX enabled
Enable JMX in Strimzi Kafka
Ensure your Kafka cluster has JMX enabled in the Strimzi Kafka resource:
apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-cluster namespace: kafkaspec: kafka: jmxOptions: {} # Enables JMX with default settings # ...other broker configurationStep 1: Create namespace
Create a dedicated namespace for the OpenTelemetry Collector (or use your existing Kafka namespace):
$kubectl create namespace kafkaStep 2: Create secret with license key
Store your New Relic license key as a Kubernetes secret:
$kubectl create secret generic nr-license-key \> --from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \> -n kafkaReplace YOUR_LICENSE_KEY with your actual New Relic license key.
Step 3: Deploy OpenTelemetry Collector
3.1 Build custom collector image
Create a custom OpenTelemetry Collector image with Java runtime and JMX scraper.
重要
Version Compatibility: This guide uses JMX Scraper 1.52.0 and OpenTelemetry Collector 0.143.1. Older collector versions may not include this scraper's hash in their compatibility list. For best results, use the latest versions as shown in this guide.
Target Architecture: Refer to the OpenTelemetry Collector releases page to find the correct binary for your system architecture (e.g., linux_amd64, linux_arm64, darwin_amd64). Update the TARGETARCH variable in the Dockerfile accordingly.
Save as Dockerfile:
# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector BinaryARG OTEL_VERSION=0.143.1ARG TARGETARCH=linux_amd64ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcolRUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)ARG JMX_SCRAPER_JAR_VERSION=1.52.0ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)ARG USER_UID=65532RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtimeFROM openjdk:17-jre-slim
COPY /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jarCOPY /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888ENTRYPOINT ["/otelcol-contrib"]CMD ["--config", "/conf/otel-agent-config.yaml"]Build and push the image:
$docker build -t your-registry/otel-collector-kafka:latest .$docker push your-registry/otel-collector-kafka:latest3.2 Create JMX custom metrics ConfigMap
First, create a ConfigMap with custom JMX metrics configuration. Save as jmx-kafka-config.yaml:
apiVersion: v1kind: ConfigMapmetadata: name: jmx-kafka-config namespace: kafkadata: jmx-kafka-config.yaml: | --- rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)ヒント
Customize metrics collection: You can scrape additional Kafka metrics by adding custom MBean rules to the kafka-jmx-config.yaml file:
Learn the basic syntax for JMX metrics rules
Find available MBean names in the Kafka monitoring documentation
This allows you to collect any JMX metric exposed by Kafka brokers based on your specific monitoring needs.
Apply the JMX ConfigMap:
$kubectl apply -f jmx-kafka-config.yaml3.3 Create collector ConfigMap
Create a ConfigMap with the OpenTelemetry Collector configuration. Save as otel-kafka-config.yaml:
---apiVersion: v1kind: ConfigMapmetadata: name: otel-collector-config namespace: kafka labels: app: otel-collectordata: otel-collector-config.yaml: | receivers: # Kafka cluster-level metrics (runs once per OTEL collector) kafkametrics/cluster: brokers: - "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# Receiver creator for dynamic per-broker JMX receivers receiver_creator: watch_observers: [k8s_observer] receivers: # JMX receiver template (created per discovered broker pod) jmx: rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka" config: endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi' jar_path: /opt/opentelemetry-jmx-scraper.jar target_system: kafka jmx_configs: /conf-jmx/jmx-kafka-config.yaml collection_interval: 30s # Set dynamic resource attributes from discovered pod resource_attributes: broker.endpoint: '`endpoint`'
exporters: otlp: endpoint: https://otlp.nr-data.net:4317 tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true headers: api-key: ${NEW_RELIC_LICENSE_KEY}
processors: # Batch processor for efficiency batch/aggregation: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Detect system resources resourcedetection: detectors: [env, docker, system] timeout: 5s override: false
# Add Kafka cluster metadata resource/kafka_metadata: attributes: - key: kafka.cluster.name value: my-cluster action: upsert
# Extract Kubernetes attributes k8sattributes: auth_type: serviceAccount passthrough: false extract: metadata: - k8s.pod.name - k8s.pod.uid - k8s.namespace.name - k8s.node.name labels: - tag_name: strimzi.cluster key: strimzi.io/cluster from: pod - tag_name: strimzi.kind key: strimzi.io/kind from: pod
# Transform metrics for New Relic UI transform: metric_statements: - context: metric statements: # Clean up descriptions and units - set(description, "") where description != "" - set(unit, "") where unit != ""
- context: resource statements: # Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit) - set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id") - delete_key(attributes, "broker.endpoint") - delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
# Filter to include only cluster-level metrics filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic cumulativetodelta:
extensions: # K8s observer extension k8s_observer: auth_type: serviceAccount observe_pods: true observe_nodes: false
service: extensions: [k8s_observer]
pipelines: # Per-broker metrics pipeline (with broker.id) metrics/broker: receivers: - receiver_creator - kafkametrics/cluster processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/exclude_cluster_metrics - transform - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated) metrics/cluster: receivers: - receiver_creator processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/include_cluster_metrics - transform/remove_broker_id - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]Configuration notes:
- Replace
my-cluster-kafka-bootstrapwith your Strimzi Kafka service name - Replace
my-clusterin theruleandkafka.cluster.namewith your cluster name - Update the namespace if different from
kafka - OTLP endpoint: Uses
https://otlp.nr-data.net:4317(US region) orhttps://otlp.eu01.nr-data.net:4317(EU region). See Configure your OTLP endpoint for other regions - The
receiver_creatorautomatically discovers Kafka broker pods using Strimzi labels
Apply the ConfigMap:
$kubectl apply -f otel-kafka-config.yaml3.4 Deploy the collector
Create the deployment. Save as otel-collector-deployment.yaml:
apiVersion: apps/v1kind: Deploymentmetadata: name: otel-collector namespace: kafkaspec: replicas: 1 selector: matchLabels: app: otel-collector template: metadata: labels: app: otel-collector spec: serviceAccountName: otel-collector containers: - name: otel-collector image: your-registry/otel-collector-kafka:latest env: - name: NEW_RELIC_LICENSE_KEY valueFrom: secretKeyRef: name: nr-license-key key: NEW_RELIC_LICENSE_KEY resources: limits: cpu: "1" memory: "2Gi" requests: cpu: "500m" memory: "1Gi" volumeMounts: - name: vol-kafka-test-cluster mountPath: /conf - name: jmx-config mountPath: /conf-jmx ports: - containerPort: 4317 # OTLP gRPC - containerPort: 4318 # OTLP HTTP - containerPort: 8888 # Metrics volumes: - name: vol-kafka-test-cluster configMap: name: otel-collector-config items: - key: otel-collector-config.yaml path: otel-agent-config.yaml - name: jmx-config configMap: name: jmx-kafka-config items: - key: jmx-kafka-config.yaml path: jmx-kafka-config.yaml---apiVersion: v1kind: ServiceAccountmetadata: name: otel-collector namespace: kafka---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRolemetadata: name: otel-collectorrules: - apiGroups: [""] resources: ["pods", "nodes"] verbs: ["get", "list", "watch"]---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRoleBindingmetadata: name: otel-collectorsubjects: - kind: ServiceAccount name: otel-collector namespace: kafkaroleRef: kind: ClusterRole name: otel-collector apiGroup: rbac.authorization.k8s.ioResource configuration:
- The resource limits above are suitable for medium-sized Kafka clusters (5-10 brokers, 20-100 topics)
Apply the deployment:
$kubectl apply -f otel-collector-deployment.yamlVerify the collector is running:
$kubectl get pods -n kafka -l app=otel-collector$kubectl logs -n kafka -l app=otel-collector -fStep 4: (Optional) Instrument producer or consumer applications
To collect application-level telemetry from Kafka producer and consumer applications running in Kubernetes, instrument them with the OpenTelemetry Java Agent.
Instrument your Kafka application
To instrument your Kafka producer or consumer applications, add the OpenTelemetry Java Agent to your existing deployment:
Download the Java agent: Add an init container to download the agent JAR:
initContainers:- name: download-otel-agentimage: busybox:latestcommand:- sh- -c- |wget -O /otel/opentelemetry-javaagent.jar \https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarvolumeMounts:- name: otel-agentmountPath: /otelConfigure the Java agent: Add environment variables to your application container:
env:- name: JAVA_TOOL_OPTIONSvalue: >--javaagent:/otel/opentelemetry-javaagent.jar-Dotel.service.name="kafka-producer"-Dotel.resource.attributes="kafka.cluster.name=my-cluster"-Dotel.exporter.otlp.endpoint="http://localhost:4317"-Dotel.exporter.otlp.protocol="grpc"-Dotel.metrics.exporter="otlp"-Dotel.traces.exporter="otlp"-Dotel.logs.exporter="otlp"-Dotel.instrumentation.kafka.experimental-span-attributes="true"-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"-Dotel.instrumentation.kafka.producer-propagation.enabled="true"-Dotel.instrumentation.kafka.enabled="true"volumeMounts:- name: otel-agentmountPath: /otelAdd the volume: Include the volume definition:
volumes:- name: otel-agentemptyDir: {}
Replace:
kafka-producerwith a unique name for your applicationmy-clusterwith your Kafka cluster name
ヒント
The configuration above sends telemetry to an OpenTelemetry Collector running on localhost:4317. Deploy your own collector with this configuration:
receivers: otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: "${NEW_RELIC_LICENSE_KEY}" compression: gzip timeout: 30s
service: pipelines: traces: receivers: [otlp] exporters: [otlp/newrelic] metrics: receivers: [otlp] exporters: [otlp/newrelic] logs: receivers: [otlp] exporters: [otlp/newrelic]This allows you to customize processing, add filters, or route to multiple backends. For other endpoint configurations, see Configure your OTLP endpoint.
The Java Agent provides out-of-the-box Kafka instrumentation with zero code changes, capturing:
- Request latencies
- Throughput metrics
- Error rates
- Distributed traces
For advanced configuration, see the Kafka instrumentation documentation.
Step 5: (Optional) Forward Kafka broker logs
To collect Kafka broker logs from your Kubernetes pods and send them to New Relic, configure the file log receiver in your OpenTelemetry Collector.
Find your data
After a few minutes, your Kafka metrics should appear in New Relic. See Find your data for detailed instructions on exploring your Kafka metrics across different views in the New Relic UI.
You can also query your data with NRQL:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Troubleshooting
Next steps
- Explore Kafka metrics - View the complete metrics reference
- Create custom dashboards - Build visualizations for your Kafka data
- Set up alerts - Monitor critical metrics like consumer lag and under-replicated partitions