New Relic がデフォルトでサポートしていないメッセージングフレームワークがあります。これらのフレームワークを私たちのAPIでインストルメントすることに興味がある場合は、以下のガイドラインに従ってください。
典型的なメッセージング
典型的なケースでは、メッセージを生成するプロセスと、そのメッセージを消費するプロセスがあります。ここでできる便利なことを以下に示します。これらが報告されるためには、トランザクション内にいなければならないことに注意してください。
生産/消費を外部として報告
MessageProduceParametersとMessageConsumeParametersに関連するデータを作成し、reportAsExternal(Params)を呼び出して、呼び出しを外部として報告することができます。
例えば、メッセージを作成する場合は次のようになります。
MessageProduceParameters params = MessageProduceParameters .library("libraryName") .destinationType(NAMED_TOPIC) .destinationName("topicName") .inboundHeaders(null).build();NewRelic.getAgent().getTransaction().getTracedMethod().reportAsExternal(params);
同様のソリューションは、消費者側にも有効です。
現在のトランザクションにメッセージメトリクス/カスタムパラメータを追加
メッセージについて、現在のトランザクションに追加したい有用な情報がある場合は、当社の addCustomParameter() API を呼び出すことで簡単に追加できます。これは次のようになります。
NewRelic.addCustomParameter("topic-name", topic);
分散型トレースのペイロードを渡す
プロセスやサービス間の作業を追跡するためには、メッセージングフレームワークは、ヘッダやその他のメカニズムを介してメッセージと共にメタデータを渡す方法を持っていなければなりません。プロデューサ側で分散型トレースペイロードを作成し、メッセージに添付してコンシューマ側で読み取ることができます。これが機能するためには、トランザクション内にいる必要があります。これを行うには、次のようなパターンに従います。
@Trace(dispatcher = true)public void sendData(Producer producer){ NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers); data = new ProducerRecord<String, String>("topic", "key", "value", headers); producer.send(data);}
そして、消費者側では
@Trace(dispatcher = true)public void processData(ConsumerRecord record){ NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Kafka, record.headers());}
この例ではKafkaを使用していますが、Kafkaはインストルメントすることができます。 Instrument Kafka message queues はインストルメントのための情報を提供しています。
エッジケース
いくつかのエッジケースには、特別な注意が必要です。以下では、そのようなケースに対処するための最善の方法を提案します。
分散型トレースのペイロードを作成し、メッセージを受信する
ここで問題となるのは、分散トレースのペイロードを作成した後、同じトランザクション内で分散トレースのペイロードを受け入れるという順序を踏むことができないことです。つまり、分散トレースには2つの試みがあり、どちらを選択するかを選択しなければなりません。最初のペイロードを作成するのではなく、2番目のペイロードを受け入れたい場合は、最初のペイロードが作成されないようにする必要があります。カスタムのインスツルメンテーションを削除するか(それがペイロードを作成している場合)、 の設定でインスツルメンテーションを無効にすることができます。
メッセージの一括読み取り
メッセージを一括して消費する場合、どの分散トレースペイロードを受け入れるかという問題があります。もし、分散トレースのペイロードを1つだけリンクすることに問題がなければ、バッチ内のペイロードのどれか1つを受け入れることができますが、その場合、残りのペイロードは失われてしまいます。しかし、すべての分散トレースを見たい場合は、メッセージごとにトランザクションを作成する必要があります。2つ目の方法はオーバーヘッドが発生する可能性があるので、その点を考慮して決定してください。このようなソリューションは次のようになります。
public void processMessages(ConsumerRecords<String, String> records) { for(ConsumerRecord<String, String> record: records) { processRecord(record); }}
@Trace(dispatcher = true)private void processRecord(ConsumerRecord<String, String> record) { final Iterator<Header> nrHeaders = record.headers().headers("newrelicDTPayload").iterator(); if (nrHeaders.hasNext()) { final Header nrHeader = nrHeaders.next(); final String payload = new String(nrHeader.value(), StandardCharsets.UTF_8); NewRelic.getAgent().getTransaction().acceptDistributedTracePayload(payload); }}
これは、その時点でスレッド上で既に開始されているトランザクションがない場合にのみ機能します。もしある場合は、バッチから1つのペイロードを受け取ることが唯一の解決策となります。
メッセージの処理時間を把握する
メッセージの処理時間をキャプチャしたい場合は、処理作業をメソッドに移動させ、そのメソッドをトレースする必要があります。例えば、以下のようになります。
public void getMessages(){ KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("topic")); while (true) { ConsumerRecords<String, String> recs = consumer.poll(10000); if (recs.count() > 0) { for (ConsumerRecord<String, String> rec : recs) { processRec(rec); } } }}
@Trace(dispatcher = true)private void processRec(ConsumerRecord<String, String> rec) { // some processing}