Kafka monitoring integration

The New Relic Kafka on-host integration reports metrics and configuration data from your Kafka service. We instrument all the key elements of your cluster, including brokers (both ZooKeeper and Bootstrap), producers, consumers, and topics.

Read on to install the Kafka integration, and to see what data it collects. To monitor Kafka with New Relic APM's Java agent, see Instrument Kafka message queues.

New Relic Infrastructure - Kafka dashboard
infrastructure.newrelic.com > Third-party services > Kafka dashboard

Compatibility and requirements

Our integration is compatible with Kafka versions 0.8 or higher.

Before installing the integration, make sure that you meet the following requirements:

For Kafka running on Kubernetes, see the Kubernetes requirements.

This integration is released as open source under the MIT license on Github.

Install and activate

To install the Kafka integration, choose your setup:

ECS

See Monitor service running on ECS.

Kubernetes

See Monitor service running on Kubernetes.

Linux installation
  1. Follow the instructions for installing an integration, using the file name nri-kafka.
  2. Change the directory to the integrations configuration folder:

    cd /etc/newrelic-infra/integrations.d
    
  3. Copy of the sample configuration file:

    sudo cp kafka-config.yml.sample kafka-config.yml
    
  4. Edit the kafka-config.yml file as described in the configuration settings.

  5. Restart the Infrastructure agent.
Windows installation
  1. Download the nri-kafka .MSI installer image from:

    http://download.newrelic.com/infrastructure_agent/windows/integrations/nri-kafka/nri-kafka-amd64.msi

  2. To install from the Windows command prompt, run:
    msiexec.exe /qn /i PATH\TO\nri-kafka-amd64.msi
  3. In the Integrations directory, C:\Program Files\New Relic\newrelic-infra\integrations.d\, create a copy of the sample configuration file by running:

    cp kafka-config.yml.sample kafka-config.yml
  4. Edit the kafka-config.yml configuration as described in the configuration settings.
  5. Restart the infrastructure agent.

Additional notes:

Configure the integration

An integration's YAML-format configuration is where you can place required login credentials and configure how data is collected. Which options you change depend on your setup and preference. The entire environment can be monitored remotely or on any node in that environment.

There are several ways to configure the integration, depending on how it was installed:

For examples of typical configurations, see the example configurations.

Commands

The configuration accepts the following commands:

  • inventory: collects configuration status
  • metrics: collects performance metrics
  • consumer_offset: collects consumer group offset data

Arguments

The configuration accepts the following arguments:

General arguments:

  • cluster_name: user-defined name to uniquely identify the cluster being monitored. Required.
  • kafka_version: the version of the Kafka broker you're connecting to, used for setting optimum API versions. Defaults to 1.0.0. Versions older than 1.0.0 may be missing some features.
  • autodiscover_strategy: the method of discovering brokers. Options are zookeeper or bootstrap. Defaults to zookeeper

Zookeeper autodiscovery arguments (only relevant when autodiscover_strategy is zookeeper):

  • zookeeper_hosts: the list of Apache ZooKeeper hosts (in JSON format) that need to be connected.
  • zookeeper_auth_scheme: the ZooKeeper authentication scheme that is used to connect. Currently, the only supported value is digest. If omitted, no authentication is used.
  • zookeeper_auth_secret: the ZooKeeper authentication secret that is used to connect. Should be of the form username:password. Only required if zookeeper_auth_scheme is specified.
  • zookeeper_path: the Zookeeper node under which the Kafka configuration resides. Defaults to /.

  • preferred_listener: use a specific listener to connect to a broker. If unset, the first listener that passes a successful test connection is used. Supported values are PLAINTEXT, SASL_PLAINTEXT, SSL, and SASL_SSL. Note: The SASL_* protocols only support Kerberos (GSSAPI) authentication.

Bootstrap broker discovery arguments (only relevant when autodiscover_strategy is bootstrap):

  • bootstrap_broker_host: the host for the bootstrap broker.

  • bootstrap_broker_kafka_port: the Kafka port for the bootstrap broker.

  • bootstrap_broker_kafka_protocol: the protocol to use to connect to the bootstrap broker. Supported values are PLAINTEXT, SASL_PLAINTEXT, SSL, and SASL_SSL. Note: The SASL_* protocols only support Kerberos (GSSAPI) authentication. Default: PLAINTEXT.

  • bootstrap_broker_jmx_port: the JMX port to use for collection.

  • bootstrap_broker_jmx_user: the JMX user to use for collection.

  • bootstrap_broker_jmx_password: the JMX password to use for collection.

Producer and consumer collection:

  • producers: producers to collect. For each provider a name, hostname, port, username, and password can be provided in JSON form. name is the producer’s name as it appears in Kafka. hostname, port, username, and password are optional and use the default if unspecified.
  • consumers: consumers to collect. For each consumer a name, hostname, port, username, and password can be specified in JSON form. name is the consumer’s name as it appears in Kafka. hostname, port, username, and password are optional and use the default if unspecified.

JMX connection options:

  • default_jmx_host: the default host to collect JMX metrics. If the host field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_port: the default port to collect JMX metrics. If the port field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_user: the default user that is connecting to the JMX host to collect metrics. This field should only be used if all brokers have a non-default username. If the username field is omitted from a producer or consumer configuration, this value will be used.
  • default_jmx_password: the default password to connect to the JMX host. This field should only be used if all brokers have a non-default password. If the password field is omitted from a producer or consumer configuration, this value will be used.
  • key_store: the filepath of the keystore containing the JMX client's SSL certificate.
  • key_store_password: the password for the JMX SSL key store.
  • trust_store: the filepath of the trust keystore containing the JMX server's SSL certificate.
  • trust_store_password: the password for the JMX trust store.
  • timeout: the timeout for individual JMX queries in milliseconds. Default: 10000.

Broker connection options:

  • tls_ca_file: the certificate authority file for SSL and SASL_SSL listeners.
  • tls_cert_file: the client certificate file for SSL and SASL_SSL listeners.
  • tls_key_file: the client key file for SSL and SASL_SSL listeners.
  • tls_insecure_skip_verify: skip verifying the server's certificate chain and host name
  • sasl_gssapi_realm: kerberos realm. Required for SASL_SSL or SASL_PLAINTEXT
  • sasl_gssapi_service_name: kerberos service name. Required for SASL_SSL or SASL_PLAINTEXT
  • sasl_gssapi_username: kerberos username. Required for SASL_SSL or SASL_PLAINTEXT
  • sasl_gssapi_key_tab_path: path to the kerberos keytab. Required for SASL_SSL or SASL_PLAINTEXT
  • sasl_gssapi_kerberos_config_path: path to the kerberos config file. Default: /etc/krb5.conf

Collection filtering:

  • collect_broker_topic_data: signals if broker and topic metrics are collected. Options are true or false, defaults to true. Should only be set to false when monitoring only producers and consumers, and topic_mode is set to all.
  • local_only_collection: collect only the metrics related to the configured bootstrap broker. Only used if autodiscover_strategy is bootstrap. Default: false

  • consumer_group_regex: regex pattern that matches the consumer groups to collect offset statistics for. This is limited to collecting statistics for 300 consumer groups. Note: consumer_groups has been deprecated, use this argument instead.
  • topic_mode: determines how many topics we collect. Options are all, none, list, or regex.
  • collect_topic_size: collect the metric Topic size. Options are true or false, defaults to false. topic_size is a resource-intensive metric to collect.
  • topic_list: array of topic names to monitor. Only in effect if topic_mode is set to list.
  • topic_regex: regex pattern that matches the topic names to monitor. Only in effect if topic_mode is set to regex.
  • topic_bucket: used to split topic collection across multiple instances. Should be of the form <bucket number>/<number of buckets>. Default: 1/1.

Labels

Labels are optional tags which help to identify collection data. Some examples are included below.

  • env: label to identify the environment. For example: production.
  • role: label to identify which role is accessing the data.

For more details on configuration parameters, see the kafka-config.yml.sample config file on GitHub.

Example: Single agent deployment

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node.

  • Brokers
  • Single ZooKeeper node
  • Single producer:

    • Name: my-producer
    • Host: my-producer.my.localnet
    • JMX Port: 9989
  • Single consumer:

    • Name: my-consumer
    • Host: my-consumer.my.localnet
    • JMX Port: 9987

Example kafka-config.yml config file for this environment:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      producers: '[{"name": "my-producer", "host": "my-producer.my.localnet", "port": 9989}]'
      consumers: '[{"name": "my-consumer", "host": "my-consumer.my.localnet", "port": 9987}]'
      topic_mode: List
      collect_topic_size: false
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

  - name: kafka-inventory
    command: inventory
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: Regex
      topic_regex: 'topic_[0-9]+'
    labels:
      env: production
      role: kafka
Example: Multiple agent deployment

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node, the producer node, and the consumer node.

  • Brokers
  • Single ZooKeeper node
  • Single producer:

    • Name: my-producer
    • Host: my-producer.my.localnet
    • JMX Port: 9989
  • Single consumer:

    • Name: my-consumer
    • Host: my-consumer.my.localnet
    • JMX Port: 9987

Example kafka-config.yml config file for this environment:

ZooKeeper node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: List
      collect_topic_size: false
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

  - name: kafka-inventory
    command: inventory
    arguments:
      zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

Producer node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      producers: '[{"name": "my-producer", "host": "my-producer.my.localnet", "port": 9989}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka

Consumer node configuration:

integration_name: com.newrelic.kafka

instances:
  - name: kafka-metrics
    command: metrics
    arguments:
      consumers: '[{"name": "my-consumer", "host": "my-consumer.my.localnet", "port": 9987}]'
      topic_mode: List
      topic_list: '["topic_1", "topic_2"]'
    labels:
      env: production
      role: kafka
Example: Offset collection

Let's consider an environment with the following structure. For this environment, assume the Infrastructure agent is installed on the ZooKeeper node.

Due to the load that collecting offset data can put on the Kafka environment, collecting offsets is done independently of normal metric and inventory data collection. We recommend installing the offset collection only on one node.

  • Brokers
  • Single ZooKeeper node
  • Consumers
  • Consumer Groups
    • consumer_group_a1
    • consumer_group_a2
    • consumer_group_b1​

For this example environment, if you want to monitor offsets for only consumer_group_a1 and consumer_group_a2, a sample config might look like this:

integration_name: com.newrelic.kafka

- name: kafka-consumer-offsets
  command: consumer_offset
  arguments:
    zookeeper_hosts: '[{"host": "localhost", "port": 2181}]'
    consumer_group_regex: 'consumer_group_a.'
  labels:
    env: production
    role: kafka

For more about the general structure of on-host integration configuration, see Configuration.

Find and use data

Data from this service is reported to an integration dashboard.

Kafka data is attached to the following event types:

You can query this data for troubleshooting purposes or to create charts and dashboards.

For more on how to find and use your data, see Understand integration data.

Metric data

The Kafka integration collects the following metric data attributes. Each metric name is prefixed with a category indicator and a period, such as broker. or consumer..

KafkaBrokerSample event

Metric Description
broker.bytesWrittenToTopicPerSecond Number of bytes written to a topic by the broker per second.
broker.IOInPerSecond Network IO into brokers in the cluster in bytes per second.
broker.IOOutPerSecond Network IO out of brokers in the cluster in bytes per second.
broker.logFlushPerSecond Log flush rate.
broker.messagesInPerSecond Incoming messages per second.
follower.requestExpirationPerSecond Rate of request expiration on followers in evictions per second.
net.bytesRejectedPerSecond Rejected bytes per second.
replication.isrExpandsPerSecond Rate of replicas joining the ISR pool.
replication.isrShrinksPerSecond Rate of replicas leaving the ISR pool.
replication.leaderElectionPerSecond Leader election rate.
replication.uncleanLeaderElectionPerSecond Unclean leader election rate.
replication.unreplicatedPartitions Number of unreplicated partitions.
request.avgTimeFetch Average time per fetch request in milliseconds.
request.avgTimeMetadata Average time for metadata request in milliseconds.
request.avgTimeMetadata99Percentile Time for metadata requests for 99th percentile in milliseconds.
request.avgTimeOffset Average time for an offset request in milliseconds.
request.avgTimeOffset99Percentile Time for offset requests for 99th percentile in milliseconds.
request.avgTimeProduceRequest Average time for a produce request in milliseconds.
request.avgTimeUpdateMetadata Average time for a request to update metadata in milliseconds.
request.avgTimeUpdateMetadata99Percentile Time for update metadata requests for 99th percentile in milliseconds.
request.clientFetchesFailedPerSecond Client fetch request failures per second.
request.fetchTime99Percentile Time for fetch requests for 99th percentile in milliseconds.
request.handlerIdle Average fraction of time the request handler threads are idle.
request.produceRequestsFailedPerSecond Failed produce requests per second.
request.produceTime99Percentile Time for produce requests for 99th percentile.

KafkaConsumerSample event

Metric Description
consumer.avgFetchSizeInBytes Average number of bytes fetched per request for a specific topic.
consumer.avgRecordConsumedPerTopic Average number of records in each request for a specific topic.
consumer.avgRecordConsumedPerTopicPerSecond Average number of records consumed per second for a specific topic in records per second.
consumer.bytesInPerSecond Consumer bytes per second.
consumer.fetchPerSecond The minimum rate at which the consumer sends fetch requests to a broke in requests per second.
consumer.maxFetchSizeInBytes Maximum number of bytes fetched per request for a specific topic.
consumer.maxLag Maximum consumer lag.
consumer.messageConsumptionPerSecond Rate of consumer message consumption in messages per second.
consumer.offsetKafkaCommitsPerSecond Rate of offset commits to Kafka in commits per second.
consumer.offsetZooKeeperCommitsPerSecond Rate of offset commits to ZooKeeper in writes per second.
consumer.requestsExpiredPerSecond Rate of delayed consumer request expiration in evictions per second.

KafkaProducerSample event

Metric Description
producer.ageMetadataUsedInMilliseconds Age in seconds of the current producer metadata being used.
producer.availableBufferInBytes Total amount of buffer memory that is not being used in bytes.
producer.avgBytesSentPerRequestInBytes Average number of bytes sent per partition per-request.
producer.avgCompressionRateRecordBatches Average compression rate of record batches.
producer.avgRecordAccumulatorsInMilliseconds Average time in ms record batches spent in the record accumulator.
producer.avgRecordSizeInBytes Average record size in bytes.
producer.avgRecordsSentPerSecond Average number of records sent per second.
producer.avgRecordsSentPerTopicPerSecond Average number of records sent per second for a topic.
producer.AvgRequestLatencyPerSecond Producer average request latency.
producer.avgThrottleTime Average time that a request was throttled by a broker in milliseconds.
producer.bufferMemoryAvailableInBytes Maximum amount of buffer memory the client can use in bytes.
producer.bufferpoolWaitTime Faction of time an appender waits for space allocation.
producer.bytesOutPerSecond Producer bytes per second out.
producer.compressionRateRecordBatches Average compression rate of record batches for a topic.
producer.iOWaitTime Producer I/O wait time in milliseconds.
producer.maxBytesSentPerRequestInBytes Max number of bytes sent per partition per-request.
producer.maxRecordSizeInBytes Maximum record size in bytes.
producer.maxRequestLatencyInMilliseconds Maximum request latency in milliseconds.
producer.maxThrottleTime Maximum time a request was throttled by a broker in milliseconds.
producer.messageRatePerSecond Producer messages per second.
producer.responsePerSecond Number of producer responses per second.
producer.requestPerSecond Number of producer requests per second.
producer.requestsWaitingResponse Current number of in-flight requests awaiting a response.
producer.threadsWaiting Number of user threads blocked waiting for buffer memory to enqueue their records.

KafkaTopicSample event

Metric Description
topic.diskSize Current topic disk size per broker in bytes.
topic.partitionsWithNonPreferredLeader Number of partitions per topic that are not being led by their preferred replica.
topic.respondMetaData Number of topics responding to meta data requests.
topic.retentionSizeOrTime Whether a partition is retained by size or both size and time. A value of 0 = time and a value of 1 = both size and time.
topic.underReplicatedPartitions Number of partitions per topic that are under-replicated.

KafkaOffsetSample event

Metric Description
consumer.offset The last consumed offset on a partition by the consumer group.
consumer.lag The difference between a broker's high water mark and the consumer's offset (consumer.hwm - consumer.offset).
consumer.hwm The offset of the last message written to a partition (high water mark).
consumer.totalLag The sum of lags across partitions consumed by a consumer.
consumerGroup.totalLag The sum of lags across all partitions consumed by a consumerGroup.
consumerGroup.maxLag The maximum lag across all partitions consumed by a consumerGroup.

Inventory data

The Kafka integration captures the non-default broker and topic configuration parameters, and collects the topic partition schemes as reported by ZooKeeper. The data is available on the Infrastructure Inventory UI page under the config/kafka source.

Troubleshooting

Troubleshooting tips:

Duplicate data being reported

For agents monitoring producers and/or consumers, and that have Topic mode set to All:, there may be a problem of duplicate data being reported. To stop the duplicate data: ensure that the configuration option Collect topic size is set to false.

Integration is logging errors 'zk: node not found'

Ensure that zookeeper_path is set correctly in the configuration file.

For more help

Recommendations for learning more: