• /
  • EnglishEspañolFrançais日本語한국어Português
  • Se connecterDémarrer

Cette traduction automatique est fournie pour votre commodité.

En cas d'incohérence entre la version anglaise et la version traduite, la version anglaise prévaudra. Veuillez visiter cette page pour plus d'informations.

Créer un problème

Monitorer Kafka auto-hébergé avec OpenTelemetry

Monitorez votre cluster Apache Kafka auto-hébergé en installant le collecteur OpenTelemetry directement sur les hôtes Linux.

Avant de commencer

Assurez-vous d'avoir :

  • Un compte New Relic avec un

  • OpenJDK installé sur l'hôte de monitoring

  • JMX activé sur les brokers Kafka (généralement sur le port 9999)

  • Accès réseau du collecteur aux brokers Kafka :

    • Port du serveur Bootstrap (généralement 9092)
    • Port JMX (généralement 9999)

Étape 1 : Installer OpenTelemetry Collector

Téléchargez et installez le binaire OpenTelemetry Collector Contrib pour le système d'exploitation de votre hôte à partir des versions d'OpenTelemetry Collector.

Étape 2 : Télécharger le scraper JMX

Le collecteur JMX collecte des métriques détaillées à partir des MBeans du broker Kafka :

bash
$
# Create directory in user home (no sudo needed)
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \
>
https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jar

Important

Compatibilité des versions: Ce guide utilise JMX Scraper 1.52.0. Les anciennes versions d'OpenTelemetry Collector peuvent ne pas inclure le hachage de ce scraper dans leur liste de compatibilité. Pour de meilleurs résultats, utilisez la dernière version d'OpenTelemetry Collector, qui inclut la prise en charge de cette version de JMX Scraper.

Étape 3 : Créer la configuration des métriques personnalisées JMX

Créez un fichier de configuration JMX personnalisé pour collecter des métriques Kafka supplémentaires non incluses dans le système cible par défaut.

Créez le fichier ~/opentelemetry/kafka-jmx-config.yaml avec la configuration suivante :

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages in per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: current heap usage
type: gauge
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Conseil

Personnaliser la collecte de métriques: Vous pouvez scraper des métriques Kafka supplémentaires en ajoutant des règles MBean personnalisées au fichier kafka-jmx-config.yaml :

Étape 4 : Créer la configuration du collecteur

Créez la configuration principale du collecteur OpenTelemetry à ~/opentelemetry/config.yaml.

receivers:
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers:
- ${env:KAFKA_BROKER_ADDRESS}
protocol_version: 2.8.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
topic_match: ".*"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
# JMX receiver for broker-specific metrics
jmx/kafka_broker-1:
jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar
endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
target_system: kafka
collection_interval: 30s
jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml
resource_attributes:
broker.id: "1"
broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
- context: resource
statements:
- delete_key(attributes, "broker.id")
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [ topic ]
aggregation_type: sum
exporters:
otlp/newrelic:
endpoint: https://otlp.nr-data.net:4317
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
metrics/brokers-cluster-topics:
receivers: [jmx/kafka_broker-1, kafkametrics]
processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation]
exporters: [otlp/newrelic]
metrics/jmx-cluster:
receivers: [jmx/kafka_broker-1]
processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation]
exporters: [otlp/newrelic]

Notes de configuration :

Important

Pour plusieurs brokers, ajoutez des récepteurs JMX supplémentaires avec différents points de terminaison et ID de broker pour monitorer chaque broker de votre cluster.

Étape 5 : Définir les variables d'environnement

Définissez les variables d'environnement requises :

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BROKER_ADDRESS="localhost:9092"
$
export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"

Remplacer :

  • YOUR_LICENSE_KEY avec votre clé de licence New Relic
  • my-kafka-cluster avec un nom unique pour votre cluster Kafka
  • localhost:9092 avec l'adresse de votre serveur d'amorçage Kafka
  • localhost:9999 avec votre point de terminaison JMX du broker Kafka

Étape 6 : Démarrer le collecteur

Exécutez le collecteur directement (pas besoin de sudo) :

bash
$
# Start the collector with your config
$
otelcol-contrib --config ~/opentelemetry/config.yaml

Le collecteur commencera à envoyer des métriques Kafka à New Relic en quelques minutes.

Créez un service systemd pour une exécution persistante (nécessite sudo pour une configuration unique) :

bash
$
# Create systemd service file
$
sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF
$
[Unit]
$
Description=OpenTelemetry Collector for Kafka
$
After=network.target
$
$
[Service]
$
Type=simple
$
User=$USER
$
WorkingDirectory=$HOME/opentelemetry
$
ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml
$
Restart=on-failure
$
Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"
$
Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"
$
Environment="KAFKA_BROKER_ADDRESS=localhost:9092"
$
Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"
$
$
[Install]
$
WantedBy=multi-user.target
$
EOF

Remplacez YOUR_LICENSE_KEY et les autres valeurs, puis activez et démarrez le service :

bash
$
sudo systemctl daemon-reload
$
sudo systemctl enable otelcol-contrib
$
sudo systemctl start otelcol-contrib
$
sudo systemctl status otelcol-contrib

Étape 7 : (Facultatif) Instrumenter les applications producteur ou consommateur

Pour collecter la télémétrie au niveau de l'application à partir de vos applications producteur et consommateur Kafka, utilisez l'Agent Java OpenTelemetry:

  1. Téléchargez l'agent Java :

    bash
    $
    mkdir -p ~/otel-java
    $
    curl -L -o ~/otel-java/opentelemetry-javaagent.jar \
    >
    https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
  2. Démarrez votre application avec l'agent :

    bash
    $
    java \
    >
    -javaagent:~/otel-java/opentelemetry-javaagent.jar \
    >
    -Dotel.service.name="kafka-producer-1" \
    >
    -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
    >
    -Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \
    >
    -Dotel.exporter.otlp.protocol="grpc" \
    >
    -Dotel.metrics.exporter="otlp" \
    >
    -Dotel.traces.exporter="otlp" \
    >
    -Dotel.logs.exporter="otlp" \
    >
    -Dotel.instrumentation.kafka.experimental-span-attributes="true" \
    >
    -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
    >
    -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
    >
    -Dotel.instrumentation.kafka.enabled="true" \
    >
    -jar your-kafka-application.jar

Remplacer :

  • kafka-producer-1 avec un nom unique pour votre application producteur ou consommateur
  • my-kafka-cluster avec le même nom de cluster utilisé dans votre configuration de collecteur
  • https://otlp.nr-data.net:4317 avec votre point de terminaison OTLP New Relic (utilisez https://otlp.eu01.nr-data.net:4317 pour la région UE). Pour les autres points de terminaison et options de configuration, consultez Configurez votre point de terminaison OTLP.

L'agent Java fournit l'instrumentation Kafka prête à l'emploi sans aucune modification de code, capturant :

  • Latences des requêtes
  • Métriques de débit
  • Taux d'erreur
  • traces distribuées

Pour une configuration avancée, consultez la documentation d'instrumentation Kafka.

Étape 6 : (Facultatif) Transférer les logs du broker Kafka

Pour collecter les logs du broker Kafka à partir de vos hôtes et les envoyer à New Relic, configurez le récepteur de logs de fichiers dans votre OpenTelemetry Collector.

Trouvez vos données

Après quelques minutes, vos métriques Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l'exploration de vos métriques Kafka dans différentes vues de l'interface utilisateur New Relic.

Vous pouvez également interroger vos données avec NRQL :

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'

Dépannage

Prochaines étapes

Droits d'auteur © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.