New Relic이 기본적으로 지원하지 않는 몇 가지 메시징 프레임워크가 있습니다. API를 사용하여 이러한 프레임워크를 계측하는 데 관심이 있는 경우 따라야 할 몇 가지 지침이 있습니다.
일반적인 메시징
일반적인 경우에는 메시지를 생성하는 프로세스와 메시지를 소비하는 프로세스가 있습니다. 여기에서 할 수 있는 몇 가지 유용한 작업은 다음과 같습니다. 보고하려면 트랜잭션 내에 있어야 합니다.
생산/소비를 외부로 보고
관련 데이터로 MessageProduceParameters 및 MessageConsumeParameters를 만든 다음 reportAsExternal(Params)를 호출하여 호출을 외부로 보고할 수 있습니다.
예를 들어 메시지를 생성할 때 다음과 같습니다.
MessageProduceParameters params = MessageProduceParameters .library("libraryName") .destinationType(NAMED_TOPIC) .destinationName("topicName") .inboundHeaders(null).build();NewRelic.getAgent().getTransaction().getTracedMethod().reportAsExternal(params);
유사한 솔루션이 소비자 측면에서 작동합니다.
현재 트랜잭션에 메시지 메트릭/커스텀 매개변수 추가
메시지에 대해 현재 트랜잭션에 추가하고 싶은 유용한 정보가 있으면 addCustomParameter() API를 호출하여 쉽게 추가할 수 있습니다. 이것은 다음과 같습니다.
NewRelic.addCustomParameter("topic-name", topic);
분산 추적 페이로드 전달
프로세스/서비스 전반에 걸친 작업을 추적하기 위해 메시징 프레임워크는 헤더 또는 다른 메커니즘을 통해 메시지와 함께 메타데이터를 전달할 수 있는 방법이 있어야 합니다. 메시지에 첨부하고 소비자 측에서 읽을 수 있는 생산자 측에서 분산 추적 페이로드를 생성할 수 있습니다. 이 작업을 수행하려면 트랜잭션에 있어야 합니다. 이렇게 하려면 다음과 같은 패턴을 따릅니다.
@Trace(dispatcher = true)public void sendData(Producer producer){ NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers); data = new ProducerRecord<String, String>("topic", "key", "value", headers); producer.send(data);}
그리고 소비자 측면에서:
@Trace(dispatcher = true)public void processData(ConsumerRecord record){ NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, record.headers());}
이 예에서는 계측할 수 있는 Kafka를 사용합니다. Instrument Kafka 메시지 대기열 은 계측에 대한 정보를 제공합니다.
엣지 케이스
특별한 주의가 필요한 일부 극단적인 경우가 있습니다. 다음은 이러한 문제를 해결하는 가장 좋은 방법에 대한 제안입니다.
분산 추적 페이로드 생성 후 메시지 수신
여기서 문제는 분산 추적 페이로드를 만든 다음 동일한 트랜잭션 내에서 해당 순서로 분산 추적 페이로드를 수락할 수 없다는 것입니다. 즉, 분산 추적에는 두 가지 시도가 있으며 원하는 것을 선택해야 합니다. 첫 번째 페이로드를 생성하지 않고 두 번째 페이로드를 수락하려면 첫 번째 페이로드가 생성되지 않도록 해야 합니다. 사용자 정의 계측을 제거하거나(페이로드를 생성하는 경우) 구성 을 통해 계측을 비활성화할 수 있습니다.
일괄 읽기 메시지
메시지의 일괄 소비의 경우 어떤 분산 추적 페이로드를 수락하는지 문제가 있습니다. 분산 추적 페이로드 중 하나만 연결해도 괜찮다면 배치의 페이로드 중 하나를 수락할 수 있지만 나머지 페이로드는 손실됩니다. 그러나 모든 분산 추적을 보려면 메시지별로 트랜잭션을 만들어야 합니다. 두 번째 솔루션은 약간의 오버헤드를 유발할 수 있으므로 결정을 내릴 때 이를 고려하십시오. 이와 같은 솔루션은 다음과 같습니다.
public void processMessages(ConsumerRecords<String, String> records) { for(ConsumerRecord<String, String> record: records) { processRecord(record); }}
@Trace(dispatcher = true)private void processRecord(ConsumerRecord<String, String> record) { final Iterator<Header> nrHeaders = record.headers().headers("newrelicDTPayload").iterator(); if (nrHeaders.hasNext()) { final Header nrHeader = nrHeaders.next(); final String payload = new String(nrHeader.value(), StandardCharsets.UTF_8); NewRelic.getAgent().getTransaction().acceptDistributedTracePayload(payload); }}
이것은 현재 스레드에서 이미 시작된 트랜잭션이 없는 경우에만 작동합니다. 하나가 있는 경우 배치에서 하나의 페이로드를 수락하는 것이 유일한 솔루션입니다.
메시지 처리 시간 캡처
메시지의 처리 시간을 캡처하려면 처리 작업을 메서드로 이동한 다음 해당 메서드를 추적해야 합니다. 예를 들어:
public void getMessages(){ KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("topic")); while (true) { ConsumerRecords<String, String> recs = consumer.poll(10000); if (recs.count() > 0) { for (ConsumerRecord<String, String> rec : recs) { processRec(rec); } } }}
@Trace(dispatcher = true)private void processRec(ConsumerRecord<String, String> rec) { // some processing}