Monitorez votre cluster Kafka fonctionnant sur Kubernetes avec l'opérateur Strimzi en déployant l'OpenTelemetry Collector. Le collecteur découvre automatiquement les pods de broker Kafka et collecte des métriques complètes.
Avant de commencer
Assurez-vous d'avoir :
- Un compte New Relic avec un
- Cluster Kubernetes avec accès kubectl
- Kafka déployé via l'opérateur Strimzi avec JMX activé
Activer JMX dans Strimzi Kafka
Assurez-vous que votre cluster Kafka a JMX activé dans la ressource Strimzi Kafka :
apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-cluster namespace: kafkaspec: kafka: jmxOptions: {} # Enables JMX with default settings # ...other broker configurationÉtape 1 : Créer un espace de noms
Créez un espace de noms dédié pour le collecteur OpenTelemetry (ou utilisez votre espace de noms Kafka existant) :
$kubectl create namespace kafkaÉtape 2 : Créer un secret avec la clé de licence
Stockez votre clé de licence New Relic en tant que secret Kubernetes :
$kubectl create secret generic nr-license-key \> --from-literal=NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY \> -n kafkaRemplacez YOUR_LICENSE_KEY par votre clé de licence New Relic réelle.
Étape 3 : Déployer le collecteur OpenTelemetry
3.1 Construire une image de collecteur personnalisée
Créez une image personnalisée du collecteur OpenTelemetry avec le runtime Java et le scraper JMX.
Important
Compatibilité des versions: Ce guide utilise JMX Scraper 1.52.0 et OpenTelemetry Collector 0.143.1. Les anciennes versions du collecteur peuvent ne pas inclure le hachage de ce scraper dans leur liste de compatibilité. Pour de meilleurs résultats, utilisez les dernières versions comme indiqué dans ce guide.
Architecture cible: Reportez-vous à la page OpenTelemetry Collector releases pour trouver le binaire correct pour l'architecture de votre système (par exemple, linux_amd64, linux_arm64, darwin_amd64). Mettez à jour la variable TARGETARCH dans le Dockerfile en conséquence.
Enregistrer sous Dockerfile:
# Multi-stage build for OpenTelemetry Collector with Java support for JMX receiver# This image bundles the OTEL Collector with Java 17 runtime and JMX scraper JAR
FROM alpine:latest as prep
# OpenTelemetry Collector BinaryARG OTEL_VERSION=0.143.1ARG TARGETARCH=linux_amd64ADD "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_${TARGETARCH}.tar.gz" /otelcontribcolRUN tar -zxvf /otelcontribcol
# JMX Scraper JAR (for JMX receiver with YAML-based configuration)ARG JMX_SCRAPER_JAR_VERSION=1.52.0ADD https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v${JMX_SCRAPER_JAR_VERSION}/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jar
# Set permissions for nonroot user (uid 65532)ARG USER_UID=65532RUN chown ${USER_UID} /opt/opentelemetry-jmx-scraper.jar
# Final minimal image with Java runtimeFROM openjdk:17-jre-slim
COPY /opt/opentelemetry-jmx-scraper.jar /opt/opentelemetry-jmx-scraper.jarCOPY /otelcol-contrib /otelcol-contrib
EXPOSE 4317 4318 8888ENTRYPOINT ["/otelcol-contrib"]CMD ["--config", "/conf/otel-agent-config.yaml"]Construire et pousser l'image :
$docker build -t your-registry/otel-collector-kafka:latest .$docker push your-registry/otel-collector-kafka:latest3.2 Créer une ConfigMap de métriques personnalisées JMX
Tout d'abord, créez un ConfigMap avec la configuration des métriques JMX personnalisées. Enregistrer sous jmx-kafka-config.yaml:
apiVersion: v1kind: ConfigMapmetadata: name: jmx-kafka-config namespace: kafkadata: jmx-kafka-config.yaml: | --- rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Conseil
Personnaliser la collecte de métriques: Vous pouvez scraper des métriques Kafka supplémentaires en ajoutant des règles MBean personnalisées au fichier kafka-jmx-config.yaml :
Apprenez la syntaxe de base des règles de métriques JMX
Trouvez les noms MBean disponibles dans la documentation de monitoring Kafka
Cela vous permet de collecter n'importe quelle métrique JMX exposée par les brokers Kafka en fonction de vos besoins de monitoring spécifiques.
Appliquer le ConfigMap JMX :
$kubectl apply -f jmx-kafka-config.yaml3.3 Créer ConfigMap du collecteur
Créez un ConfigMap avec la configuration OpenTelemetry Collector. Enregistrer sous otel-kafka-config.yaml:
---apiVersion: v1kind: ConfigMapmetadata: name: otel-collector-config namespace: kafka labels: app: otel-collectordata: otel-collector-config.yaml: | receivers: # Kafka cluster-level metrics (runs once per OTEL collector) kafkametrics/cluster: brokers: - "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# Receiver creator for dynamic per-broker JMX receivers receiver_creator: watch_observers: [k8s_observer] receivers: # JMX receiver template (created per discovered broker pod) jmx: rule: type == "pod" && labels["strimzi.io/kind"] == "Kafka" && labels["strimzi.io/cluster"] == "my-cluster" && labels["strimzi.io/name"] == "my-cluster-kafka" config: endpoint: 'service:jmx:rmi:///jndi/rmi://`endpoint`:9999/jmxrmi' jar_path: /opt/opentelemetry-jmx-scraper.jar target_system: kafka jmx_configs: /conf-jmx/jmx-kafka-config.yaml collection_interval: 30s # Set dynamic resource attributes from discovered pod resource_attributes: broker.endpoint: '`endpoint`'
exporters: otlp: endpoint: https://otlp.nr-data.net:4317 tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true headers: api-key: ${NEW_RELIC_LICENSE_KEY}
processors: # Batch processor for efficiency batch/aggregation: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Detect system resources resourcedetection: detectors: [env, docker, system] timeout: 5s override: false
# Add Kafka cluster metadata resource/kafka_metadata: attributes: - key: kafka.cluster.name value: my-cluster action: upsert
# Extract Kubernetes attributes k8sattributes: auth_type: serviceAccount passthrough: false extract: metadata: - k8s.pod.name - k8s.pod.uid - k8s.namespace.name - k8s.node.name labels: - tag_name: strimzi.cluster key: strimzi.io/cluster from: pod - tag_name: strimzi.kind key: strimzi.io/kind from: pod
# Transform metrics for New Relic UI transform: metric_statements: - context: metric statements: # Clean up descriptions and units - set(description, "") where description != "" - set(unit, "") where unit != ""
- context: resource statements: # Extract broker.id from k8s.pod.name: my-cluster-kafka-0 -> 0 (supports multi-digit) - set(attributes["broker.id"], ExtractPatterns(attributes["k8s.pod.name"], ".*-(?P<broker_id>\\d+)$")["broker_id"]) where attributes["k8s.pod.name"] != nil
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id") - delete_key(attributes, "broker.endpoint") - delete_key(attributes, "k8s.pod.name")
# Topic sum aggregation for replicas_in_sync metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
# Filter to include only cluster-level metrics filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Filter to exclude cluster-level metrics from broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
# Convert cumulative metrics to delta for New Relic cumulativetodelta:
extensions: # K8s observer extension k8s_observer: auth_type: serviceAccount observe_pods: true observe_nodes: false
service: extensions: [k8s_observer]
pipelines: # Per-broker metrics pipeline (with broker.id) metrics/broker: receivers: - receiver_creator - kafkametrics/cluster processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/exclude_cluster_metrics - transform - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]
# Cluster-level metrics pipeline (without broker.id, aggregated) metrics/cluster: receivers: - receiver_creator processors: - memory_limiter - resourcedetection - resource/kafka_metadata - k8sattributes - filter/include_cluster_metrics - transform/remove_broker_id - metricstransform/kafka_topic_sum_aggregation - cumulativetodelta - batch/aggregation exporters: [otlp]Notes de configuration :
- Remplacez
my-cluster-kafka-bootstrappar le nom de votre service Kafka Strimzi - Remplacez
my-clusterdansruleetkafka.cluster.namepar le nom de votre cluster - Mettez à jour l'espace de noms s'il est différent de
kafka - Point de terminaison OTLP: utilise
https://otlp.nr-data.net:4317(région US) ouhttps://otlp.eu01.nr-data.net:4317(région UE). Consultez Configurez votre point de terminaison OTLP pour d'autres régions - Le
receiver_creatordécouvre automatiquement les pods de broker Kafka à l'aide des étiquettes Strimzi
Appliquer la ConfigMap :
$kubectl apply -f otel-kafka-config.yaml3.4 Déployer le collecteur
Créez le déploiement. Enregistrer sous otel-collector-deployment.yaml:
apiVersion: apps/v1kind: Deploymentmetadata: name: otel-collector namespace: kafkaspec: replicas: 1 selector: matchLabels: app: otel-collector template: metadata: labels: app: otel-collector spec: serviceAccountName: otel-collector containers: - name: otel-collector image: your-registry/otel-collector-kafka:latest env: - name: NEW_RELIC_LICENSE_KEY valueFrom: secretKeyRef: name: nr-license-key key: NEW_RELIC_LICENSE_KEY resources: limits: cpu: "1" memory: "2Gi" requests: cpu: "500m" memory: "1Gi" volumeMounts: - name: vol-kafka-test-cluster mountPath: /conf - name: jmx-config mountPath: /conf-jmx ports: - containerPort: 4317 # OTLP gRPC - containerPort: 4318 # OTLP HTTP - containerPort: 8888 # Metrics volumes: - name: vol-kafka-test-cluster configMap: name: otel-collector-config items: - key: otel-collector-config.yaml path: otel-agent-config.yaml - name: jmx-config configMap: name: jmx-kafka-config items: - key: jmx-kafka-config.yaml path: jmx-kafka-config.yaml---apiVersion: v1kind: ServiceAccountmetadata: name: otel-collector namespace: kafka---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRolemetadata: name: otel-collectorrules: - apiGroups: [""] resources: ["pods", "nodes"] verbs: ["get", "list", "watch"]---apiVersion: rbac.authorization.k8s.io/v1kind: ClusterRoleBindingmetadata: name: otel-collectorsubjects: - kind: ServiceAccount name: otel-collector namespace: kafkaroleRef: kind: ClusterRole name: otel-collector apiGroup: rbac.authorization.k8s.ioConfiguration des ressources :
- Les limites de ressources ci-dessus conviennent aux clusters Kafka de taille moyenne (5 à 10 brokers, 20 à 100 topics)
Appliquer le déploiement :
$kubectl apply -f otel-collector-deployment.yamlVérifiez que le collecteur est en cours d'exécution :
$kubectl get pods -n kafka -l app=otel-collector$kubectl logs -n kafka -l app=otel-collector -fÉtape 4 : (Facultatif) Instrumenter les applications producteur ou consommateur
Pour collecter la télémétrie au niveau de l'application à partir des applications producteur et consommateur Kafka s'exécutant dans Kubernetes, instrumentez-les avec l'agent Java OpenTelemetry.
Instrumentez votre application Kafka
Pour instrumenter vos applications producteur ou consommateur Kafka, ajoutez l'agent Java OpenTelemetry à votre déploiement existant :
Télécharger l'agent Java: ajoutez un conteneur init pour télécharger le JAR de l'agent :
initContainers:- name: download-otel-agentimage: busybox:latestcommand:- sh- -c- |wget -O /otel/opentelemetry-javaagent.jar \https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarvolumeMounts:- name: otel-agentmountPath: /otelConfigurer l'agent Java: Ajoutez des variables d'environnement à votre conteneur d'application :
env:- name: JAVA_TOOL_OPTIONSvalue: >--javaagent:/otel/opentelemetry-javaagent.jar-Dotel.service.name="kafka-producer"-Dotel.resource.attributes="kafka.cluster.name=my-cluster"-Dotel.exporter.otlp.endpoint="http://localhost:4317"-Dotel.exporter.otlp.protocol="grpc"-Dotel.metrics.exporter="otlp"-Dotel.traces.exporter="otlp"-Dotel.logs.exporter="otlp"-Dotel.instrumentation.kafka.experimental-span-attributes="true"-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true"-Dotel.instrumentation.kafka.producer-propagation.enabled="true"-Dotel.instrumentation.kafka.enabled="true"volumeMounts:- name: otel-agentmountPath: /otelAjouter le volume: Inclure la définition du volume :
volumes:- name: otel-agentemptyDir: {}
Remplacer :
kafka-produceravec un nom unique pour votre applicationmy-clusteravec le nom de votre cluster Kafka
Conseil
La configuration ci-dessus envoie des données de télémétrie à un collecteur OpenTelemetry s'exécutant sur localhost:4317. Déployez votre propre collecteur avec cette configuration :
receivers: otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: "${NEW_RELIC_LICENSE_KEY}" compression: gzip timeout: 30s
service: pipelines: traces: receivers: [otlp] exporters: [otlp/newrelic] metrics: receivers: [otlp] exporters: [otlp/newrelic] logs: receivers: [otlp] exporters: [otlp/newrelic]Cela vous permet de personnaliser le traitement, d'ajouter des filtres ou d'acheminer vers plusieurs backends. Pour d'autres configurations de point de terminaison, consultez Configurer votre point de terminaison OTLP.
L'agent Java fournit l'instrumentation Kafka prête à l'emploi sans aucune modification de code, capturant :
- Latences des requêtes
- Métriques de débit
- Taux d'erreur
- traces distribuées
Pour une configuration avancée, consultez la documentation d'instrumentation Kafka.
Étape 5 : (Facultatif) Transférer les logs du broker Kafka
Pour collecter les logs des brokers Kafka à partir de vos pods Kubernetes et les envoyer à New Relic, configurez le récepteur de logs de fichiers dans votre collecteur OpenTelemetry.
Trouvez vos données
Après quelques minutes, vos métriques Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l'exploration de vos métriques Kafka dans différentes vues de l'interface utilisateur New Relic.
Vous pouvez également interroger vos données avec NRQL :
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Dépannage
Prochaines étapes
- Explorer les métriques Kafka - Afficher la référence complète des métriques
- Créer des dashboards personnalisés - Créez des visualisations pour vos données Kafka
- Configurer les alertes - Monitorer les métriques critiques telles que le retard du consommateur et les partitions sous-répliquées