Kafka monitoring integration

New Relic Infrastructure’s Kafka integration reports data from Kafka to New Relic infrastructure. This document explains how to install and configure the Kafka integration, and describes the data collected.

Access to this feature depends on your subscription level. Requires Infrastructure Pro.

Features

Apache Kafka is a distributed streaming platform designed for high volume publish-subscribe messages and streams. The New Relic Kafka on-host integration reports metrics and configuration data from your Kafka service, including important metrics like providing insight into brokers, producers, consumers, and topics.

Compatibility and requirements

To use the Kafka integration, ensure your system meets these requirements:

  • New Relic Infrastructure installed on host
  • Linux distribution compatible with New Relic Infrastructure
  • Kafka versions 0.8 or higher
  • Java 8 or above
  • JMX enabled on all brokers, Java consumers, and Java producers that you want monitored
  • Total number of monitored topics must be fewer than 300

Install

On-host integrations do not automatically update. For best results, you should occasionally update the integration package and update the Infrastructure agent.

To install the Kafka integration:

  1. Follow the instructions for installing an integration, using the file name nri-kafka.
  2. Via the command line, change the directory to the integrations configuration folder:

    cd /etc/newrelic-infra/integrations.d
    
  3. Create a copy of the sample configuration file by running:

    sudo cp kafka-config.yml.sample kafka-config.yml
    
  4. Edit the configuration file kafka-config.yml using the configuration settings described below.

  5. Restart the Infrastructure agent.

Configure

There are several ways to configure the Infrastructure agent to monitor a Kafka environment. The entire environment can be monitored remotely or on any node in that environment.

Use the configuration file (kafka-config.yml) to store required login credentials and configure how data is collected.

Commands

The kafka-config.yml file provides three commands:

  • inventory: collects configuration status
  • metric: collects performance metrics
  • consumer_offset: collects consumer group offset data

Arguments

The configuration file accepts the following arguments. For examples of some typical configurations, see the example configurations.

  • name: The name with which you want to identify the integration in New Relic.
  • zookeeper_hosts: The list of Apache ZooKeeper hosts (in JSON format) that need to be connected.
  • zookeeper_auth_scheme: The ZooKeeper authentication scheme that is used to connect. Currently, the only supported value is digest. If omitted, no authentication is used.
  • zookeeper_auth_secret: The ZooKeeper authentication secret that is used to connect. Should be of the form username:password. Only required if zookeeper_auth_scheme is specified.
  • zookeeper_path: Root path to zookeeper-*-fatjar.jar. Default value is /.

  • default_jmx_host: The default host to collect JMX metrics. If the host field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_port: The default port to collect JMX metrics. If the port field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_user: The default user that is connecting to the JMX host to collect metrics. This field should only be used if all brokers have a non-default username. If the username field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_password: The default password to connect to the JMX host. This field should only be used if all brokers have a non-default password. If the password field is omitted from a producer or consumer configuration, this value will be used.
  • collect_broker_topic_data: Signals if broker and topic metrics are collected. Options are true or false, defaults to true. Should only be set to false when monitoring only producers and consumers, and topic_mode is set to all.
  • producers: Producers to collect. For each provider a name, hostname, port, username and password can be provided in JSON form. name is the producer’s name as it appears in Kafka. hostname, port, username, and password are optional and will use the default if unspecified.
  • consumers: Consumers to collect. For each consumer a name, hostname, port, username and password can be specified in JSON form. name is the consumer’s name as it appears in Kafka. hostname, port, username and password are optional and will use default if unspecified.
  • consumer_groups: A whitelist of the Consumer Groups (in JSON format) in which to collect offset data for.
  • topic_mode: Determines how many topics we collect. Options are all, none or list.
  • collect_topic_size: Collect the metric Topic size. Options are true or false, defaults to false. topic_size is a resource-intensive metric to collect.
  • topic_list: Array of topic names to monitor. Only in effect if topic_mode is set to list.

Labels

Labels are optional tags which help to identify collection data in Insights. Some examples are included below.

  • env: Label to identify the environment. For example: production.
  • role: Label to identify which role is accessing the data.

For more details on configuration parameters, see the kafka-config.yml.sample configuration file on GitHub

Example: Single agent deployment

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node.

  • Brokers
  • Single ZooKeeper node
  • Single producer:

    • Name: my-producer
    • Host: my-producer.my.localnet
    • JMX Port: 9989
  • Single consumer:

    • Name: my-consumer
    • Host: my-consumer.my.localnet
    • JMX Port: 9987

Example kafka-config.yml file configuration for this environment:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      zookeeper_hosts: ‘[{"host": "localhost", "port": 2181}]’
      producers: '[{"name": "my-producer", "host": "my-producer.my.localnet", "port": 9989}]'
      consumers: '[{"name": "my-consumer", "host": "my-consumer.my.localnet", "port": 9987}]'
      topic_mode: List
      collect_topic_size: false
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

  - name: kafka-inventory
    command: inventory
    arguments:
        zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka
Example: Multiple agent deployment

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node, the producer node, and the consumer node.

  • Brokers
  • Single ZooKeeper node
  • Single producer:

    • Name: my-producer
    • Host: my-producer.my.localnet
    • JMX Port: 9989
  • Single consumer:

    • Name: my-consumer
    • Host: my-consumer.my.localnet
    • JMX Port: 9987

Example kafka-config.yml configuration for this environment:

ZooKeeper node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: List
      collect_topic_size: false
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

  - name: kafka-inventory
    command: inventory
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

Producer node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      producers: '[{"name": "my-producer", "host": "my-producer.my.localnet", "port": 9989}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

Consumer node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      consumers: '[{"name": "my-consumer", "host": "my-consumer.my.localnet", "port": 9987}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka
Example: Offset collection

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node.

Due to the load that collecting offset data can put on the Kafka environment, collecting offsets is done independently of normal metric and inventory data collection.

  • Brokers
  • Single ZooKeeper node
  • Consumers
  • Consumer Groups
    • consumer_group_1
    • consumer_group_2
  • Topics:

    • topic_1 (5 partitions)

      • consumer_group_1 subscribed
      • consumer_group_2 subscribed
    • topic_2 (3 partitions)

      • consumer_group_2 subscribed

For the above environment, let's say you want to monitor consumer_group_1 and consumer_group_2 offsets. For consumer_group_1, you only want to monitor topic_1 of all partitions. For consumer_group_2 you only want to monitor the first two partitions of topic_1 and all partitions on topic_2.

Example kafka-config.yml file configuration for this environment:

integration_name: com.newrelic.kafka

- name: kafka-consumer-offsets
  command: consumer_offset
  arguments:
    zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
    consumer_groups: '{"consumer_group_1": {"topic_1": []}, "consumer_group_2": {"topic_1": [1,2], "topic_2": []}}'
  labels:
    env: production
    role: kafka

If the JSON object for a consumer group’s topics is empty (ex: "consumer_group": {}), then offsets will be collected for all topics and partitions that the consumer group is subscribed to. Likewise, if the list of partitions for a topic is empty (ex: "consumer_group": {"topic": []}), then offsets for all partitions of that topic will be collected for the consumer group.

Find and use data

To find your integration data in Infrastructure, go to infrastructure.newrelic.com > Integrations > On-host integrations and look for a Kafka integration.

In New Relic Insights, Kafka data is attached to these event types:

For more on how to find and use your data, see Understand integration data.

Metrics

The Kafka integration collects the following metric data attributes. Each metric name is prefixed with a category indicator and a period, such as broker. or consumer..

KafkaBrokerSample event

Metric Description
broker.bytesWrittenToTopicPerSecond Number of bytes written to a topic by the broker per second.
broker.IOInPerSecond Network IO into brokers in the cluster in bytes per second.
broker.IOOutPerSecond Network IO out of brokers in the cluster in bytes per second.
broker.logFlushPerSecond Log flush rate.
broker.messagesInPerSecond Incoming message per second.
follower.requestExpirationPerSecond Rate of request expiration on followers in evictions per second.
net.bytesRejectedPerSecond Rejected bytes per second.
replication.isrExpandsPerSecond Rate of replicas joining the ISR pool.
replication.isrShrinksPerSecond Rate of replicas leaving the ISR pool.
replication.leaderElectionPerSecond Leader election rate.
replication.uncleanLeaderElectionPerSecond Unclean leader election rate.
replication.unreplicatedPartitions Number of unreplicated partitions.
request.avgTimeFetch Average time per fetch request in milliseconds.
request.avgTimeMetadata Average time for metadata request in milliseconds.
request.avgTimeMetadata99Percentile Time for metadata requests for 99th percentile in milliseconds.
request.avgTimeOffset Average time for an offset request in milliseconds.
request.avgTimeOffset99Percentile Time for offset requests for 99th percentile in milliseconds.
request.avgTimeProduceRequest Average time for a produce request in milliseconds.
request.avgTimeUpdateMetadata Average time for a request to update metadata in milliseconds.
request.avgTimeUpdateMetadata99Percentile Time for update metadata requests for 99th percentile in milliseconds.
request.clientFetchesFailedPerSecond Client fetch request failures per second.
request.fetchTime99Percentile Time for fetch requests for 99th percentile in milliseconds.
request.handlerIdle Average fraction of time the request handler threads are idle.
request.produceRequestsFailedPerSecond Failed produce requests per second.
request.produceTime99Percentile Time for produce requests for 99th percentile.

KafkaConsumerSample event

Metric Description
consumer.avgFetchSizeInBytes Average number of bytes fetched per request for a specific topic.
consumer.avgRecordConsumedPerTopic Average number of records in each request for a specific topic.
consumer.avgRecordConsumedPerTopicPerSecond Average number of records consumed per second for a specific topic in records per second.
consumer.bytesInPerSecond Consumer bytes per second.
consumer.fetchPerSecond The minimum rate at which the consumer sends fetch requests to a broke in requests per second.
consumer.maxFetchSizeInBytes Maximum number of bytes fetched per request for a specific topic.
consumer.maxLag Maximum consumer lag.
consumer.messageConsumptionPerSecond Rate of consumer message consumption in messages per second.
consumer.offsetKafkaCommitsPerSecond Rate of offset commits to Kafka in commits per second.
consumer.offsetZooKeeperCommitsPerSecond Rate of offset commits to ZooKeeper in writes per second.
consumer.requestsExpiredPerSecond Rate of delayed consumer request expiration in evictions per second.

KafkaProducerSample event

Metric Description
producer.ageMetadataUsedInMilliseconds Age in seconds of the current producer metadata being used.
producer.availableBufferInBytes Total amount of buffer memory that is not being used in bytes.
producer.avgBytesSentPerRequestInBytes Average number of bytes sent per partition per-request.
producer.avgCompressionRateRecordBatches Average compression rate of record batches.
producer.avgRecordAccumulatorsInMilliseconds Average time in ms record batches spent in the record accumulator.
producer.avgRecordSizeInBytes Average record size in bytes.
producer.avgRecordsSentPerSecond Average number of records sent per second.
producer.avgRecordsSentPerTopicPerSecond Average number of records sent per second for a topic.
producer.AvgRequestLatencyPerSecond Producer average request latency.
producer.avgThrottleTime Average time that a request was throttled by a broker in milliseconds.
producer.bufferMemoryAvailableInBytes Maximum amount of buffer memory the client can use in bytes.
producer.bufferpoolWaitTime Faction of time an appender waits for space allocation.
producer.bytesOutPerSecond Producer bytes per second out.
producer.compressionRateRecordBatches Average compression rate of record batches for a topic.
producer.iOWaitTime Producer I/O wait time in milliseconds.
producer.maxBytesSentPerRequestInBytes Max number of bytes sent per partition per-request.
producer.maxRecordSizeInBytes Maximum record size in bytes.
producer.maxRequestLatencyInMilliseconds Maximum request latency in milliseconds.
producer.maxThrottleTime Maximum time a request was throttled by a broker in milliseconds.
producer.messageRatePerSecond Producer messages per second.
producer.responsePerSecond Number of producer responses per second.
producer.requestPerSecond Number of producer requests per second.
producer.requestsWaitingResponse Current number of in-flight requests awaiting a response.
producer.threadsWaiting Number of user threads blocked waiting for buffer memory to enqueue their records.

KafkaTopicSample event

Metric Description
topic.diskSize Current topic disk size per broker in bytes.
topic.partitionsWithNonPreferredLeader Number of partitions per topic that are not being led by their preferred replica.
topic.respondMetaData Number of topics responding to meta data requests.
topic.retentionSizeOrTime Whether a partition is retained by size or both size and time. A value of 0 = time and a value of 1 = both size and time.
topic.underReplicatedPartitions Number of partitions per topic that are under-replicated.

KafkaOffsetSample event

Metric Description
kafka.consumerOffset The offset for a consumer group for a given topic partition.
kafka.highWaterMark The offset for a broker for a given topic partition.
kafka.consumerLag The difference between a broker's high water mark and a consumer groups offset (kafka.highWaterMark - kafka.consumerOffset).

Inventory data

The Kafka integration captures the non-default broker and topic configuration parameters, and collects the topic partition schemes as reported by ZooKeeper. The data is available on the Infrastructure Inventory UI page under the config/kafka source.

Troubleshooting

Troubleshooting tips:

Duplicate data being reported

For agents monitoring producers and/or consumers, and that have Topic mode set to All:, there may be a problem of duplicate data being reported. To stop the duplicate data: ensure that the configuration option Collect topic size is set to false.

Integration is logging errors 'zk: node not found'

Ensure that zookeeper_path is set correctly in the configuration file.

For more help

Recommendations for learning more: