• /
  • EnglishEspañolFrançais日本語한국어Português
  • Se connecterDémarrer

Cette traduction automatique est fournie pour votre commodité.

En cas d'incohérence entre la version anglaise et la version traduite, la version anglaise prévaudra. Veuillez visiter cette page pour plus d'informations.

Créer un problème

Monitorer Kafka sur Kubernetes (Strimzi) avec OpenTelemetry

Monitorez votre cluster Kafka fonctionnant sur Kubernetes avec l'opérateur Strimzi en déployant l'OpenTelemetry Collector. Le collecteur découvre automatiquement les pods de broker Kafka et collecte des métriques complètes.

Avant de commencer

Assurez-vous d'avoir :

Activer JMX dans Strimzi Kafka

Assurez-vous que votre cluster Kafka a JMX activé dans la ressource Strimzi Kafka :

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
namespace: kafka
spec:
kafka:
jmxOptions: {} # Enables JMX with default settings
# ...other broker configuration

Étape 1 : Créer un espace de noms

Créez un espace de noms dédié pour le collecteur OpenTelemetry (ou utilisez votre espace de noms Kafka existant) :

bash
$
kubectl create namespace kafka

Étape 2 : Créer un secret avec la clé de licence

Stockez votre clé de licence New Relic en tant que secret Kubernetes :

bash
$
kubectl create secret generic nr-license-key \
>
--from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \
>
-n kafka

Remplacez YOUR_LICENSE_KEY par votre clé de licence New Relic réelle.

Étape 3 : Déployer le collecteur OpenTelemetry

3.1 Construire une image de collecteur personnalisée

Créez une image personnalisée du collecteur OpenTelemetry avec le runtime Java et le scraper JMX.

Important

Compatibilité des versions: Ce guide utilise JMX Scraper 1.52.0 et OpenTelemetry Collector 0.143.1. Les anciennes versions du collecteur peuvent ne pas inclure le hachage de ce scraper dans leur liste de compatibilité. Pour de meilleurs résultats, utilisez les dernières versions comme indiqué dans ce guide.

Architecture cible: Reportez-vous à la page OpenTelemetry Collector releases pour trouver le binaire correct pour l'architecture de votre système (par exemple, linux_amd64, linux_arm64, darwin_amd64). Mettez à jour la variable TARGETARCH dans le Dockerfile en conséquence.

Enregistrer sous Dockerfile:

# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver
# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector Binary
ARG OTEL_VERSION=0.143.1
ARG TARGETARCH=linux_amd64
ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcol
RUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)
ARG JMX_SCRAPER_JAR_VERSION=1.52.0
ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)
ARG USER_UID=65532
RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtime
FROM openjdk:17-jre-slim
COPY --from=prep /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
COPY --from=prep /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888
ENTRYPOINT ["/otelcol-contrib"]
CMD ["--config", "/conf/otel-agent-config.yaml"]

Construire et pousser l'image :

bash
$
docker build -t your-registry/otel-collector-kafka:latest .
$
docker push your-registry/otel-collector-kafka:latest

3.2 Créer une ConfigMap de métriques personnalisées JMX

Tout d'abord, créez un ConfigMap avec la configuration des métriques JMX personnalisées. Enregistrer sous jmx-kafka-config.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
name: jmx-kafka-config
namespace: kafka
data:
jmx-kafka-config.yaml: |
---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Conseil

Personnaliser la collecte de métriques: Vous pouvez scraper des métriques Kafka supplémentaires en ajoutant des règles MBean personnalisées au fichier kafka-jmx-config.yaml :

Appliquer le ConfigMap JMX :

bash
$
kubectl apply -f jmx-kafka-config.yaml

3.3 Créer ConfigMap du collecteur

Créez un ConfigMap avec la configuration OpenTelemetry Collector. Enregistrer sous otel-kafka-config.yaml:

---
apiVersion: v1
kind: ConfigMap
metadata:
name: otel-collector-config
namespace: kafka
labels:
app: otel-collector
data:
otel-collector-config.yaml: |
receivers:
# Kafka cluster-level metrics (runs once per OTEL collector)
kafkametrics/cluster:
brokers:
- "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# Receiver creator for dynamic per-broker JMX receivers
receiver_creator:
watch_observers: [k8s_observer]
receivers:
# JMX receiver template (created per discovered broker pod)
jmx:
rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka"
config:
endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi'
jar_path: /opt/opentelemetry-jmx-scraper.jar
target_system: kafka
jmx_configs: /conf-jmx/jmx-kafka-config.yaml
collection_interval: 30s
# Set dynamic resource attributes from discovered pod
resource_attributes:
broker.endpoint: '`endpoint`'
exporters:
otlp:
endpoint: https://otlp.nr-data.net:4317
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
headers:
api-key: ${NEW_RELIC_LICENSE_KEY}
processors:
# Batch processor for efficiency
batch/aggregation:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Detect system resources
resourcedetection:
detectors: [env, docker, system]
timeout: 5s
override: false
# Add Kafka cluster metadata
resource/kafka_metadata:
attributes:
- key: kafka.cluster.name
value: my-cluster
action: upsert
# Extract Kubernetes attributes
k8sattributes:
auth_type: serviceAccount
passthrough: false
extract:
metadata:
- k8s.pod.name
- k8s.pod.uid
- k8s.namespace.name
- k8s.node.name
labels:
- tag_name: strimzi.cluster
key: strimzi.io/cluster
from: pod
- tag_name: strimzi.kind
key: strimzi.io/kind
from: pod
# Transform metrics for New Relic UI
transform:
metric_statements:
- context: metric
statements:
# Clean up descriptions and units
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
# Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit)
- set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
- delete_key(attributes, "broker.endpoint")
- delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
# Filter to include only cluster-level metrics
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic
cumulativetodelta:
extensions:
# K8s observer extension
k8s_observer:
auth_type: serviceAccount
observe_pods: true
observe_nodes: false
service:
extensions: [k8s_observer]
pipelines:
# Per-broker metrics pipeline (with broker.id)
metrics/broker:
receivers:
- receiver_creator
- kafkametrics/cluster
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/exclude_cluster_metrics
- transform
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated)
metrics/cluster:
receivers:
- receiver_creator
processors:
- memory_limiter
- resourcedetection
- resource/kafka_metadata
- k8sattributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- metricstransform/kafka_topic_sum_aggregation
- cumulativetodelta
- batch/aggregation
exporters: [otlp]

Notes de configuration :

  • Remplacez my-cluster-kafka-bootstrap par le nom de votre service Kafka Strimzi
  • Remplacez my-cluster dans rule et kafka.cluster.name par le nom de votre cluster
  • Mettez à jour l'espace de noms s'il est différent de kafka
  • Point de terminaison OTLP: utilise https://otlp.nr-data.net:4317 (région US) ou https://otlp.eu01.nr-data.net:4317 (région UE). Consultez Configurez votre point de terminaison OTLP pour d'autres régions
  • Le receiver_creator découvre automatiquement les pods de broker Kafka à l'aide des étiquettes Strimzi

Appliquer la ConfigMap :

bash
$
kubectl apply -f otel-kafka-config.yaml

3.4 Déployer le collecteur

Créez le déploiement. Enregistrer sous otel-collector-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
name: otel-collector
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: otel-collector
template:
metadata:
labels:
app: otel-collector
spec:
serviceAccountName: otel-collector
containers:
- name: otel-collector
image: your-registry/otel-collector-kafka:latest
env:
- name: NEW_RELIC_LICENSE_KEY
valueFrom:
secretKeyRef:
name: nr-license-key
key: NEW_RELIC_LICENSE_KEY
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "500m"
memory: "1Gi"
volumeMounts:
- name: vol-kafka-test-cluster
mountPath: /conf
- name: jmx-config
mountPath: /conf-jmx
ports:
- containerPort: 4317 # OTLP gRPC
- containerPort: 4318 # OTLP HTTP
- containerPort: 8888 # Metrics
volumes:
- name: vol-kafka-test-cluster
configMap:
name: otel-collector-config
items:
- key: otel-collector-config.yaml
path: otel-agent-config.yaml
- name: jmx-config
configMap:
name: jmx-kafka-config
items:
- key: jmx-kafka-config.yaml
path: jmx-kafka-config.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: otel-collector
namespace: kafka
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: otel-collector
rules:
- apiGroups: [""]
resources: ["pods", "nodes"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: otel-collector
subjects:
- kind: ServiceAccount
name: otel-collector
namespace: kafka
roleRef:
kind: ClusterRole
name: otel-collector
apiGroup: rbac.authorization.k8s.io

Configuration des ressources :

  • Les limites de ressources ci-dessus conviennent aux clusters Kafka de taille moyenne (5 à 10 brokers, 20 à 100 topics)

Appliquer le déploiement :

bash
$
kubectl apply -f otel-collector-deployment.yaml

Vérifiez que le collecteur est en cours d'exécution :

bash
$
kubectl get pods -n kafka -l app=otel-collector
$
kubectl logs -n kafka -l app=otel-collector -f

Étape 4 : (Facultatif) Instrumenter les applications producteur ou consommateur

Pour collecter la télémétrie au niveau de l'application à partir des applications producteur et consommateur Kafka s'exécutant dans Kubernetes, instrumentez-les avec l'agent Java OpenTelemetry.

Instrumentez votre application Kafka

Pour instrumenter vos applications producteur ou consommateur Kafka, ajoutez l'agent Java OpenTelemetry à votre déploiement existant :

  1. Télécharger l'agent Java: ajoutez un conteneur init pour télécharger le JAR de l'agent :

    initContainers:
    - name: download-otel-agent
    image: busybox:latest
    command:
    - sh
    - -c
    - |
    wget -O /otel/opentelemetry-javaagent.jar \
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  2. Configurer l'agent Java: Ajoutez des variables d'environnement à votre conteneur d'application :

    env:
    - name: JAVA_TOOL_OPTIONS
    value: >-
    -javaagent:/otel/opentelemetry-javaagent.jar
    -Dotel.service.name="kafka-producer"
    -Dotel.resource.attributes="kafka.cluster.name=my-cluster"
    -Dotel.exporter.otlp.endpoint="http://localhost:4317"
    -Dotel.exporter.otlp.protocol="grpc"
    -Dotel.metrics.exporter="otlp"
    -Dotel.traces.exporter="otlp"
    -Dotel.logs.exporter="otlp"
    -Dotel.instrumentation.kafka.experimental-span-attributes="true"
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true"
    -Dotel.instrumentation.kafka.enabled="true"
    volumeMounts:
    - name: otel-agent
    mountPath: /otel
  3. Ajouter le volume: Inclure la définition du volume :

    volumes:
    - name: otel-agent
    emptyDir: {}

Remplacer :

  • kafka-producer avec un nom unique pour votre application
  • my-cluster avec le nom de votre cluster Kafka

Conseil

La configuration ci-dessus envoie des données de télémétrie à un collecteur OpenTelemetry s'exécutant sur localhost:4317. Déployez votre propre collecteur avec cette configuration :

receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: "${NEW_RELIC_LICENSE_KEY}"
compression: gzip
timeout: 30s
service:
pipelines:
traces:
receivers: [otlp]
exporters: [otlp/newrelic]
metrics:
receivers: [otlp]
exporters: [otlp/newrelic]
logs:
receivers: [otlp]
exporters: [otlp/newrelic]

Cela vous permet de personnaliser le traitement, d'ajouter des filtres ou d'acheminer vers plusieurs backends. Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.

L'agent Java fournit l'instrumentation Kafka prête à l'emploi sans aucune modification de code, capturant :

  • Latences des requêtes
  • Métriques de débit
  • Taux d'erreur
  • traces distribuées

Pour une configuration avancée, consultez la documentation d'instrumentation Kafka.

Étape 5 : (Facultatif) Transférer les logs du broker Kafka

Pour collecter les logs des brokers Kafka à partir de vos pods Kubernetes et les envoyer à New Relic, configurez le récepteur de logs de fichiers dans votre collecteur OpenTelemetry.

Trouvez vos données

Après quelques minutes, vos métriques Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l'exploration de vos métriques Kafka dans différentes vues de l'interface utilisateur New Relic.

Vous pouvez également interroger vos données avec NRQL :

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

Dépannage

Prochaines étapes

Droits d'auteur © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.