Kafka monitoring integration

New Relic Infrastructure’s Kafka integration reports data from Kafka to New Relic infrastructure. This document explains how to install and configure the Kafka integration, and describes the data collected. To monitor Kafka with New Relic APM's Java agent, see Instrument Kafka message queues.

This integration is released as Open Source under the MIT license on GitHub. A change log is also available there for the latest updates.

Access to this feature depends on your subscription level. Requires Infrastructure Pro.

Features

Apache Kafka is a distributed streaming platform designed for high volume publish-subscribe messages and streams. The New Relic Kafka on-host integration reports metrics and configuration data from your Kafka service, including important metrics like providing insight into brokers, producers, consumers, and topics.

Compatibility and requirements

To use the Kafka integration, ensure your system meets these requirements:

  • New Relic Infrastructure installed on host
  • Linux distribution compatible with New Relic Infrastructure
  • Kafka versions 0.8 or higher
  • Java 8 or higher
  • JMX enabled on all brokers, Java consumers, and Java producers that you want monitored
  • Total number of monitored topics must be fewer than 300

Install

On-host integrations do not automatically update. For best results, you should occasionally update the integration package and update the Infrastructure agent.

To install the Kafka integration:

  1. Follow the instructions for installing an integration, using the file name nri-kafka.
  2. Via the command line, change the directory to the integrations configuration folder:

    cd /etc/newrelic-infra/integrations.d
    
  3. Create a copy of the sample configuration file by running:

    sudo cp kafka-config.yml.sample kafka-config.yml
    
  4. Edit the configuration file kafka-config.yml using the configuration settings described below.

  5. Restart the Infrastructure agent.

It is also possible to manually install integrations from a tarball file. For more information, see Install manually from a tarball archive.

Configure

There are several ways to configure the Infrastructure agent to monitor a Kafka environment. The entire environment can be monitored remotely or on any node in that environment.

Use the configuration file (kafka-config.yml) to store required login credentials and configure how data is collected.

Commands

The kafka-config.yml file provides three commands:

  • inventory: collects configuration status
  • metrics: collects performance metrics
  • consumer_offset: collects consumer group offset data

Arguments

The configuration file accepts the following arguments. For examples of some typical configurations, see the example configurations.

  • cluster_name: A user-defined name to uniquely identify the cluster being monitored. Required.
  • zookeeper_hosts: The list of Apache ZooKeeper hosts (in JSON format) that need to be connected.
  • zookeeper_auth_scheme: The ZooKeeper authentication scheme that is used to connect. Currently, the only supported value is digest. If omitted, no authentication is used.
  • zookeeper_auth_secret: The ZooKeeper authentication secret that is used to connect. Should be of the form username:password. Only required if zookeeper_auth_scheme is specified.
  • zookeeper_path: The Zookeeper node under which the Kafka configuration resides. Defaults to /.

  • default_jmx_host: The default host to collect JMX metrics. If the host field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_port: The default port to collect JMX metrics. If the port field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_user: The default user that is connecting to the JMX host to collect metrics. This field should only be used if all brokers have a non-default username. If the username field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_password: The default password to connect to the JMX host. This field should only be used if all brokers have a non-default password. If the password field is omitted from a producer or consumer configuration, this value will be used.
  • collect_broker_topic_data: Signals if broker and topic metrics are collected. Options are true or false, defaults to true. Should only be set to false when monitoring only producers and consumers, and topic_mode is set to all.
  • producers: Producers to collect. For each provider a name, hostname, port, username and password can be provided in JSON form. name is the producer’s name as it appears in Kafka. hostname, port, username, and password are optional and will use the default if unspecified.
  • consumers: Consumers to collect. For each consumer a name, hostname, port, username and password can be specified in JSON form. name is the consumer’s name as it appears in Kafka. hostname, port, username and password are optional and will use default if unspecified.
  • consumer_group_regex: A regex pattern that matches the consumer groups to collect offset statistics for. This is limited to collecting statistics for 300 consumer groups. Note: consumer_groups has been deprecated, use this argument instead.
  • consumer_groups: Deprecated - use consumer_group_regex instead. An allow list of the Consumer Groups (in JSON format) in which to collect offset data for.
  • topic_mode: Determines how many topics we collect. Options are all, none, list, or regex.
  • collect_topic_size: Collect the metric Topic size. Options are true or false, defaults to false. topic_size is a resource-intensive metric to collect.
  • topic_list: Array of topic names to monitor. Only in effect if topic_mode is set to list.
  • topic_regex: A regex pattern that matches the topic names to monitor. Only in effect if topic_mode is set to regex.
  • key_store: The filepath of the keystore containing the JMX client's SSL certificate.
  • key_store_password: The password for the SSL key store.
  • trust_store: The filepath of the trust keystore containing the JMX server's SSL certificate.
  • trust_store_password: The password for the trust store.
  • timeout: The timeout for individual JMX queries in milliseconds. Default: 10000.

Labels

Labels are optional tags which help to identify collection data in Insights. Some examples are included below.

  • env: Label to identify the environment. For example: production.
  • role: Label to identify which role is accessing the data.

For more details on configuration parameters, see the kafka-config.yml.sample configuration file on GitHub

Example: Single agent deployment

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node.

  • Brokers
  • Single ZooKeeper node
  • Single producer:

    • Name: my-producer
    • Host: my-producer.my.localnet
    • JMX Port: 9989
  • Single consumer:

    • Name: my-consumer
    • Host: my-consumer.my.localnet
    • JMX Port: 9987

Example kafka-config.yml file configuration for this environment:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      producers: '[{"name": "my-producer", "host": "my-producer.my.localnet", "port": 9989}]'
      consumers: '[{"name": "my-consumer", "host": "my-consumer.my.localnet", "port": 9987}]'
      topic_mode: List
      collect_topic_size: false
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

  - name: name: kafka-inventory
    command: inventory
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: Regex
      topic_regex: 'topic_[0-9]+'
    labels:
      env: production
      role: kafka
Example: Multiple agent deployment

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node, the producer node, and the consumer node.

  • Brokers
  • Single ZooKeeper node
  • Single producer:

    • Name: my-producer
    • Host: my-producer.my.localnet
    • JMX Port: 9989
  • Single consumer:

    • Name: my-consumer
    • Host: my-consumer.my.localnet
    • JMX Port: 9987

Example kafka-config.yml configuration for this environment:

ZooKeeper node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: List
      collect_topic_size: false
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

  - name: kafka-inventory
    command: inventory
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

Producer node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      producers: '[{"name": "my-producer", "host": "my-producer.my.localnet", "port": 9989}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

Consumer node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      consumers: '[{"name": "my-consumer", "host": "my-consumer.my.localnet", "port": 9987}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka
Example: Offset collection

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node.

Due to the load that collecting offset data can put on the Kafka environment, collecting offsets is done independently of normal metric and inventory data collection. We recommend installing the offset collection only on one node.

  • Brokers
  • Single ZooKeeper node
  • Consumers
  • Consumer Groups
    • consumer_group_1
    • consumer_group_2
  • Topics:

    • topic_1 (5 partitions)

      • consumer_group_1 subscribed
      • consumer_group_2 subscribed
    • topic_2 (3 partitions)

      • consumer_group_2 subscribed

For this example environment, let's say you want to monitor consumer_group_1 and consumer_group_2 offsets.

  • For consumer_group_1, you only want to monitor topic_1 of all partitions.
  • For consumer_group_2 you only want to monitor the first two partitions of topic_1 and all partitions on topic_2.

Example kafka-config.yml file configuration for this environment:

integration_name: com.newrelic.kafka

- name: kafka-consumer-offsets
  command: consumer_offset
  arguments:
    zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
    consumer_groups: '{"consumer_group_1": {"topic_1": []}, "consumer_group_2": {"topic_1": [1,2], "topic_2": []}}'
  labels:
    env: production
    role: kafka

If the list of partitions for a topic is empty (ex: "consumer_group": {"topic": []}), then offsets for all partitions of that topic will be collected for the consumer group.

Find and use data

To find your integration data in Infrastructure, go to infrastructure.newrelic.com > Third-party services and look for a Kafka integration.

In New Relic Insights, Kafka data is attached to these event types:

For more on how to find and use your data, see Understand integration data.

Metrics

The Kafka integration collects the following metric data attributes. Each metric name is prefixed with a category indicator and a period, such as broker. or consumer..

KafkaBrokerSample event

Metric Description
broker.bytesWrittenToTopicPerSecond Number of bytes written to a topic by the broker per second.
broker.IOInPerSecond Network IO into brokers in the cluster in bytes per second.
broker.IOOutPerSecond Network IO out of brokers in the cluster in bytes per second.
broker.logFlushPerSecond Log flush rate.
broker.messagesInPerSecond Incoming messages per second.
follower.requestExpirationPerSecond Rate of request expiration on followers in evictions per second.
net.bytesRejectedPerSecond Rejected bytes per second.
replication.isrExpandsPerSecond Rate of replicas joining the ISR pool.
replication.isrShrinksPerSecond Rate of replicas leaving the ISR pool.
replication.leaderElectionPerSecond Leader election rate.
replication.uncleanLeaderElectionPerSecond Unclean leader election rate.
replication.unreplicatedPartitions Number of unreplicated partitions.
request.avgTimeFetch Average time per fetch request in milliseconds.
request.avgTimeMetadata Average time for metadata request in milliseconds.
request.avgTimeMetadata99Percentile Time for metadata requests for 99th percentile in milliseconds.
request.avgTimeOffset Average time for an offset request in milliseconds.
request.avgTimeOffset99Percentile Time for offset requests for 99th percentile in milliseconds.
request.avgTimeProduceRequest Average time for a produce request in milliseconds.
request.avgTimeUpdateMetadata Average time for a request to update metadata in milliseconds.
request.avgTimeUpdateMetadata99Percentile Time for update metadata requests for 99th percentile in milliseconds.
request.clientFetchesFailedPerSecond Client fetch request failures per second.
request.fetchTime99Percentile Time for fetch requests for 99th percentile in milliseconds.
request.handlerIdle Average fraction of time the request handler threads are idle.
request.produceRequestsFailedPerSecond Failed produce requests per second.
request.produceTime99Percentile Time for produce requests for 99th percentile.

KafkaConsumerSample event

Metric Description
consumer.avgFetchSizeInBytes Average number of bytes fetched per request for a specific topic.
consumer.avgRecordConsumedPerTopic Average number of records in each request for a specific topic.
consumer.avgRecordConsumedPerTopicPerSecond Average number of records consumed per second for a specific topic in records per second.
consumer.bytesInPerSecond Consumer bytes per second.
consumer.fetchPerSecond The minimum rate at which the consumer sends fetch requests to a broke in requests per second.
consumer.maxFetchSizeInBytes Maximum number of bytes fetched per request for a specific topic.
consumer.maxLag Maximum consumer lag.
consumer.messageConsumptionPerSecond Rate of consumer message consumption in messages per second.
consumer.offsetKafkaCommitsPerSecond Rate of offset commits to Kafka in commits per second.
consumer.offsetZooKeeperCommitsPerSecond Rate of offset commits to ZooKeeper in writes per second.
consumer.requestsExpiredPerSecond Rate of delayed consumer request expiration in evictions per second.

KafkaProducerSample event

Metric Description
producer.ageMetadataUsedInMilliseconds Age in seconds of the current producer metadata being used.
producer.availableBufferInBytes Total amount of buffer memory that is not being used in bytes.
producer.avgBytesSentPerRequestInBytes Average number of bytes sent per partition per-request.
producer.avgCompressionRateRecordBatches Average compression rate of record batches.
producer.avgRecordAccumulatorsInMilliseconds Average time in ms record batches spent in the record accumulator.
producer.avgRecordSizeInBytes Average record size in bytes.
producer.avgRecordsSentPerSecond Average number of records sent per second.
producer.avgRecordsSentPerTopicPerSecond Average number of records sent per second for a topic.
producer.AvgRequestLatencyPerSecond Producer average request latency.
producer.avgThrottleTime Average time that a request was throttled by a broker in milliseconds.
producer.bufferMemoryAvailableInBytes Maximum amount of buffer memory the client can use in bytes.
producer.bufferpoolWaitTime Faction of time an appender waits for space allocation.
producer.bytesOutPerSecond Producer bytes per second out.
producer.compressionRateRecordBatches Average compression rate of record batches for a topic.
producer.iOWaitTime Producer I/O wait time in milliseconds.
producer.maxBytesSentPerRequestInBytes Max number of bytes sent per partition per-request.
producer.maxRecordSizeInBytes Maximum record size in bytes.
producer.maxRequestLatencyInMilliseconds Maximum request latency in milliseconds.
producer.maxThrottleTime Maximum time a request was throttled by a broker in milliseconds.
producer.messageRatePerSecond Producer messages per second.
producer.responsePerSecond Number of producer responses per second.
producer.requestPerSecond Number of producer requests per second.
producer.requestsWaitingResponse Current number of in-flight requests awaiting a response.
producer.threadsWaiting Number of user threads blocked waiting for buffer memory to enqueue their records.

KafkaTopicSample event

Metric Description
topic.diskSize Current topic disk size per broker in bytes.
topic.partitionsWithNonPreferredLeader Number of partitions per topic that are not being led by their preferred replica.
topic.respondMetaData Number of topics responding to meta data requests.
topic.retentionSizeOrTime Whether a partition is retained by size or both size and time. A value of 0 = time and a value of 1 = both size and time.
topic.underReplicatedPartitions Number of partitions per topic that are under-replicated.

KafkaOffsetSample event

Metric Description
consumer.offset The last consumed offset on a partition by the consumer group.
consumer.lag The difference between a broker's high water mark and the consumer's offset (consumer.hwm - consumer.offset).
consumer.hwm The offset of the last message written to a partition (high water mark).
consumer.totalLag The sum of lags across partitions consumed by a consumer.
consumerGroup.totalLag The sum of lags across all partitions consumed by a consumerGroup.
consumerGroup.maxLag The maximum lag across all partitions consumed by a consumerGroup.

Inventory data

The Kafka integration captures the non-default broker and topic configuration parameters, and collects the topic partition schemes as reported by ZooKeeper. The data is available on the Infrastructure Inventory UI page under the config/kafka source.

Troubleshooting

Troubleshooting tips:

Duplicate data being reported

For agents monitoring producers and/or consumers, and that have Topic mode set to All:, there may be a problem of duplicate data being reported. To stop the duplicate data: ensure that the configuration option Collect topic size is set to false.

Integration is logging errors 'zk: node not found'

Ensure that zookeeper_path is set correctly in the configuration file.

For more help

Recommendations for learning more: