Instrument Kafka message queues

The New Relic Java agent automatically collects data from Kafka's Java clients library. Because Kafka is a high-performance messaging system that generates a lot of data, you can customize the agent for your app's specific throughput and use cases.

This document explains how to collect and view three types of Kafka data:

Kafka instrumentation is available in Java agent versions 4.12.0 or higher. For supported Kafka client versions, see Java compatibility and requirements.

For the New Relic Infrastructure Kafka integration, see Kafka monitoring integration.

View Kafka metrics

After installation, the agent automatically reports rich Kafka metrics with information about messaging rates, latency, lag, and more. The Java agent collects all Kafka consumer and producer metrics (but not connect or stream metrics).

To view these metrics, create a custom dashboard:

  1. Go to the New Relic metric explorer.
  2. Use the metric explorer to locate your metrics. You can find Kafka metrics in this metric folder:

    MessageBroker/Kafka/Internal/KafkaMetricName

    For example, the request-rate metric is located at:

    MessageBroker/Kafka/Internal/consumer-metrics/request-rate

    For a full list of Kafka consumer and producer metrics, see the Kafka documentation.

  3. Add the metrics you want to monitor to a dashboard by clicking Add to dashboard.

Enable Kafka event collection

You can configure the agent to collect event data instead of metric timeslice data (for the difference between metric timeslice and event data, see data collection). This allows you to use NRQL to filter and facet the default Kafka metrics. When enabled, the agent collects one Kafka event every 30 seconds. This event contains all of the the data from Kafka consumer and produce metrics captured since the previous event.

The agent records up to 2000 events per harvest cycle, though you can change this value with max_samples_stored. Kafka event data is included in this pool. If you use the recordCustomEvent() API call to send custom events to New Relic and you send more than 2000 events, the agent will discard some Kafka or custom events.

To enable Kafka event collection:

  1. Add the kafka.metrics.as_events.enabled element to your newrelic.yml config file:

    kafka.metrics.as_events.enabled: true
  2. Restart your JVM.
  3. Use the event explorer to view your Kafka events, located in the KafkaMetrics event type. Or, use NRQL to query your events directly. For example:

    SELECT average('producer-metrics.record-send-rate') from KafkaMetrics SINCE 30 minutes ago timeseries

Enable Kafka distributed traces

The Java agent can also collect distributed traces from Kafka clients. To collect distributed traces, you'll first need to enable the feature. Then, call the Java agent API to instrument transactions on both the producer and consumer side. The agent will still report metric or event data from Kafka with distributed tracing enabled.

Distributed tracing is only available for Kafka Clients 0.11.0.0 or higher

Test out Kafka distributed traces in a dev environment before you enable them in production. The instrumentation adds a 150 to 200 byte payload to the headers of each message. If your Kafka messages are very small, Kafka distributed traces can add significant processing and storage overhead. This additional payload size could cause Kafka to drop messages if they exceed your Kafka messaging size limit.

If you have not enabled distributed tracing for your app before, read the distributed tracing transition guide before you enable it.

To collect distributed traces from Kafka:

1. Enable distributed tracing in the config file

If you have not enabled distributed tracing for your app before, read the distributed tracing transition guide before you enable it.

To enable Kafka distributed tracings, enable two settings in your newrelic.yml config file:

  • Set the distributed_tracing element to true:

    distributed_tracing:
      enabled: true
    
  • Enable the Kafka-specific distributed tracing features by adding the following to your config file:

    class_transformer:
      kafka-clients-spans:
        enabled: true
2. Instrument the Kafka producer

To instrument your Kafka producer, you'll need to start a transaction before any calls to Producer.send(ProducerRecord<K, V> record). To do this, add the Java agent @Trace(dispatcher = true) annotation to the method.

For example:

@Trace(dispatcher = true)
public static void createAndSend(KafkaProducer<String, String> producer){
    ProducerRecord<String, String> data = new ProducerRecord<String, String>("topic", "key", "value");
    producer.send(data);
}
3. Instrument the Kafka consumer

To instrument your Kafka consumer, you'll need to start a transaction when the message is being processed. The agent stores the distributed tracing payload header under the newrelic key. Retrieve the header, then call the New Relic transaction API to accept the payload.

For example:

@Trace(dispatcher = true)
private static void processMessage(ConsumerRecord<String, String> rec){
    Iterable<Header> headers = rec.headers().headers("newrelic");
    for(Header header: headers) {
        NewRelic.getAgent().getTransaction().acceptDistributedTracePayload(new String(header.value(), StandardCharsets.UTF_8));
    }
}

For more help

Recommendations for learning more: