Messaging framework instrumentation

There are some messaging frameworks that New Relic doesn’t support by default. If you are interested in instrumenting these frameworks with our API here are some guidelines to follow.

Typical messaging

In the typical case, there is a process that produces a message and a process that consumes the message. Some useful things you could do here are below. Note you must be within a transaction for these to report.

Report the producing/consuming as external

You can create MessageProduceParameters and MessageConsumeParameters with relevant data and then call reportAsExternal(Params) to report the calls as externals.

For example, when producing a message this would look like:

MessageProduceParameters params = MessageProduceParameters.library("libraryName").destinationType(NAMED_TOPIC).destinationName("topicName").inboundHeaders(null).build();
NewRelic.getAgent().getTransaction().getTracedMethod().reportAsExternal(params);

A similar solution would work on the consumer side.

Add message metrics/custom parameters to current transaction

If there is useful information you want to add to your current transaction about the message, you can do that easily by calling our addCustomParameter() API. This looks like this:

NewRelic.addCustomParameter("topic-name", topic);

Pass a distributed trace payload

In order to track the work across processes/services, the messaging framework must have a way to pass along metadata with the message, whether it’s via headers or some other mechanism. You can create a distributed trace payload on the producer side that can be attached to the message and read on the consumer side. You must be in a transaction for this to work. To do this you would follow a pattern like this:

@Trace(dispatcher = true)
public void sendData(Producer producer){
    final DistributedTracePayload payload = NewRelic.getAgent().getTransaction().createDistributedTracePayload();
    headers.add(new RecordHeader("newrelic", payload.text().getBytes(StandardCharsets.UTF_8)));
    data = new ProducerRecord<String, String>("topic", "key", "value", headers);
    producer.send(data);
}

And on the consumer side:

@Trace(dispatcher = true)
public void processData(ConsumerRecord record){
    final Iterator<Header> nrHeaders = record.headers().headers("newrelic").iterator();
    if (nrHeaders.hasNext()) {
        final Header nrHeader = nrHeaders.next();
        final String payload = new String(nrHeader.value(), StandardCharsets.UTF_8);
        NewRelic.getAgent().getTransaction().acceptDistributedTracePayload(payload);
    }
}

This example uses Kafka, which you can instrument. Instrument Kafka message queues provides information for instrumenting.

Edge cases

There are some edge cases that require extra care. Below are suggestions on the best way to tackle them.

Create a distributed trace payload and then receive a message

The problem here is that you can’t create a distributed trace payload and then accept a distributed trace payload within the same transaction in that order. This means that there are two attempts of distributed tracing and you have to choose which one you want. If you would like to accept the second payload rather than create the first one, you need to prevent the first payload from getting created. You can remove the custom instrumentation (if that’s what’s creating the payload) or disable instrumentation via our config

Batch reading messages

For batch consumption of messages, there is a problem of which distributed tracing payload you accept. If you are ok with linking just one of the distributed trace payloads then you can accept any one of the payloads in the batch but that will result in the rest of the payloads getting lost. However, if you want to see all of the distributed traces then you’ll need to create a transaction per message. The second solution could incur some overhead so take that into consideration when making a decision. A solution like that would look like:

public void processMessages(ConsumerRecords<String, String> records) {
    for(ConsumerRecord<String, String> record: records) {
        processRecord(record);
    }
}

@Trace(dispatcher = true)
private void processRecord(ConsumerRecord<String, String> record) {
    final Iterator<Header> nrHeaders = record.headers().headers("newrelicDTPayload").iterator();
    if (nrHeaders.hasNext()) {
        final Header nrHeader = nrHeaders.next();
        final String payload = new String(nrHeader.value(), StandardCharsets.UTF_8);
        NewRelic.getAgent().getTransaction().acceptDistributedTracePayload(payload);
    }
}

This will only work if there isn’t a transaction already started on the thread at this time. If there is one then just accepting one payload from the batch is your only solution.

Capturing the processing time of the message

If you would like to capture the processing time of a message then you will need to move the processing work into a method and then trace that method. For example:

public void getMessages(){
    KafkaConsumer consumer = new KafkaConsumer(props);
    consumer.subscribe(Arrays.asList("topic"));
    while (true) {
        ConsumerRecords<String, String> recs = consumer.poll(10000);
        if (recs.count() == 0) {
        } else {
            for (ConsumerRecord<String, String> rec : recs) {
                processRec(rec);
            }
        }
    }
}

@Trace(dispatcher = true)
private void processRec(ConsumerRecord<String, String> rec) {
    // some processing
}

For more help

Use the Guide to using the Java agent API to learn more about our Java agent API.

Recommendations for learning more: