この機械翻訳は、参考として提供されています。

英語版と翻訳版に矛盾がある場合は、英語版が優先されます。詳細については、このページを参照してください。

問題を作成する

dbt Cloud with Airflow および Snowflake インテグレーション

dbt CloudインテグレーションとAirflowは、 dbt Cloudジョブとリソースの状態を監視し、実行、モデル、テストが失敗した場合などの問題を特定するのに役立ちます。

このインテグレーションはApache Airflowで実行され、構成されている場合は失敗したテストに対してSnowflakeをクエリします。

前提条件

  • API が有効になっており、 Snowflakeをデータベースとして使用している dbt Cloud アカウント。
  • dbt Cloudアカウントが実行されるSnowflakeアカウントへのアクセス。
  • 既存のAirflow環境バージョン 2.8.1 以上、またはDocker Composeを実行する機能。

統合をインストールする

次のいずれかの方法で、Airflow を使用してNew Relic dbt Cloud インテグレーションをインストールできます。

  • 既存の Airflow 環境にインストールします。 本番環境におすすめです。
  • docker Compose を使用してインストールします。 これは迅速な POC に適しています。

タブをクリックして、ニーズに最も適したオプションを選択します。

Snowflake プロバイダーがあることを確認し、次のコマンドを実行してnewrelic-dbt-cloud-integrationリポジトリのクローンを作成します。

bash
$
pip install apache-airflow-providers-snowflake>=3.0.0
bash
$
git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git

airflow/dagsの内容を Airflow dags フォルダのルートにコピーします

DAG に必要な 5 つの Airflow 接続を作成します。 次の表には、接続名とそれを設定するための情報が記載されています。 これらすべての型はhttpであることに注意してください。

接続名

説明

タイプ

ホストとパスワード

dbt_cloud_admin_api

SimpleHttpHookを使用して dbt Cloud 管理 API に接続できます

http

ホスト: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ ( ACCOUNT_IDを dbt Cloud アカウント ID に置き換えます)

パスワード: dbt Cloud APIパスワード (プロファイル設定) またはサービス アカウント

dbt_cloud_discovery_api

dbt検出APIに接続できます

http

ホスト: https://metadata.cloud.getdbt.com/graphql

パスワード: DBT クラウド サービス アカウント

nr_insights_insert

New Relicにカスタムイベントをアップロードできます

http

ホスト: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events ( ACCOUNT_IDをアカウント ID に置き換えてください)

パスワード: NR インサイトにAPIキーを挿入

nr_insights_query

New Relicカスタムイベントをクエリできます

http

ホスト: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query ( ACCOUNT_IDをアカウント ID に置き換えてください)

パスワード: NR インサイト作成APIキー

上記の 4 つを設定したら、Snowflake 接続を設定する必要があります。 Snowflake を使用すると、失敗したテスト行をクエリできます。 スノーフレーク接続を構成する方法は多数あります。 秘密鍵ペアを使用して設定するには、次の属性を入力します。

  • Type: スノーフレーク
  • Login: Snowflakeのユーザー名
  • Account: Snowflakeアカウント
  • Warehouse: Snowflakeの倉庫
  • Role: あなたの Snowflake の役割。 失敗したテスト行をすべて取得するには、ロールに dbt Cloud で使用されるすべての DB へのアクセス権が必要です。
  • Private Key Text: この接続に使用される完全な秘密鍵。
  • Password: 秘密鍵が暗号化されている場合のパスフレーズ。 暗号化されていない場合は空白になります。

new_relic_data_pipeline_observability_get_dbt_run_metadata2 DAG を有効にしてセットアップを完了します。

newrelic-dbt-cloud-integrationリポジトリをクローンするには、次のコマンドを実行します。

bash
$
git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git

次に、Airflow ディレクトリにcdします。

bash
$
cd newrelic-dbt-cloud-integration/airflow

次に、次のコマンドを実行してdocker compose を初期化して実行します。

bash
$
docker-compose up airflow-init
bash
$
docker-compose up

Airflow UIリリース: http://localhost:8080

DAG に必要な 5 つの Airflow 接続を作成します。 次の表には、接続名とそれを設定するための情報が記載されています。 これらすべての型はhttpであることに注意してください。

接続名

説明

タイプ

ホストとパスワード

dbt_cloud_admin_api

SimpleHttpHookを使用して dbt Cloud 管理 API に接続できます

http

ホスト: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ ( ACCOUNT_IDを dbt Cloud アカウント ID に置き換えます)

パスワード: dbt Cloud APIパスワード (プロファイル設定) またはサービス アカウント

dbt_cloud_discovery_api

dbt検出APIに接続できます

http

ホスト: https://metadata.cloud.getdbt.com/graphql

パスワード: DBT クラウド サービス アカウント

nr_insights_insert

New Relicにカスタムイベントをアップロードできます

http

ホスト: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events ( ACCOUNT_IDをアカウント ID に置き換えてください)

パスワード: NR インサイトにAPIキーを挿入

nr_insights_query

New Relicカスタムイベントをクエリできます

http

ホスト: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query ( ACCOUNT_IDをアカウント ID に置き換えてください)

パスワード: NR インサイト作成APIキー

上記の 4 つを設定したら、Snowflake 接続を設定する必要があります。 Snowflake を使用すると、失敗したテスト行をクエリできます。 スノーフレーク接続を構成する方法は多数あります。 秘密鍵ペアを使用して設定するには、次の属性を入力します。

  • Type: スノーフレーク
  • Login: Snowflakeのユーザー名
  • Account: Snowflakeアカウント
  • Warehouse: Snowflakeの倉庫
  • Role: あなたの Snowflake の役割。 失敗したテスト行をすべて取得するには、ロールに dbt Cloud で使用されるすべての DB へのアクセス権が必要です。
  • Private Key Text: この接続に使用される完全な秘密鍵。
  • Password: 秘密鍵が暗号化されている場合のパスフレーズ。 暗号化されていない場合は空白になります。

new_relic_data_pipeline_observability_get_dbt_run_metadata2 DAG を有効にしてセットアップを完了します。

データを検索する

このインテグレーションは、3 つのカスタム イベントを作成し、 New Relicに報告します。

DAG設定

接続:

この DAG は、設定なしでそのまま実行することを目的としています。 同時に、貴社では接続に対して独自の命名規則を持っている可能性があることを認識しています。 そのため、 dag_config.yml内にはさまざまな接続の名前を設定できる簡単な構成があります。

connections:
dbt_cloud_admin_api: dbt_cloud_admin_api
dbt_cloud_discovery_api: dbt_cloud_discovery_api
nr_insights_query: nr_insights_query
nr_insights_insert: nr_insights_insert
snowflake_api: SNOWFLAKE

ランニングチーム:

dbt ジョブは異なるチームによって所有されている可能性がありますが、dbt Cloud 内にこれを設定する場所はありません。 Python コードを使用してチームを動的に設定できます。 独自のコードを記述するには、 airflow/dags/nr_utils/nr_utils.pyを変更し、必要なロジックをget_team_from_run()に配置します。 その関数に渡される実行データは、次の属性にアクセスできます。

  • プロジェクト名
  • 環境名
  • 実行用の dbt Cloud v2 APIにリストされているすべてのフィールド。 すべての属性の先頭に「run_」が付きます

関数の例を次に示します。

def get_team_from_run(run: dict) -> str:
team = 'Data Engineering'
if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']:
team = 'Platform'
if re.match(r'Catch-all', run['job_name']):
team = 'Project Catch All'
return team

Dbtプロジェクトの設定

Dbt プロジェクト内では、メタ構成を使用して追加のチームおよびテスト固有の設定を設定できます。

  • Team: ジョブを所有するのはrun_team determinesですが、テストやモデルなどの失敗したリソースに関する集計通知を上流または下流のチームが受け取る必要がある場合があります。 チームを設定するとそれが実現しやすくなります。
  • alert_failed_test_rows: Trueに設定すると、失敗したテスト行が有効になり、失敗したテストのクエリが実行され、最初の10列までがNew Relicに送信されます。
  • failed_test_rows_limit: New Relic に送信する失敗したテスト行の最大数。 不当な量のデータが New Relic に送信される状況を防ぐために、100 行というハードコードされた制限を設けています。
  • slack_mentions: Slack アラートを有効にすると、このフィールドでメッセージに誰を記載するかを設定できます。

dbt_project.ymlでこれを設定すると、チームが「データ エンジニアリング」に設定され、失敗したテスト行が有効になります。

models:
dbt_fake_company:
+meta:
nr_config:
team: 'Data Engineering'
alert_failed_test_rows: False
failed_test_rows_limit: 5
slack_mentions: '@channel, @business_users'

リソースに、メッセージと呼ばれる別の属性を追加できます。 次の設定では、特定の失敗したテストについてパートナー ビジネス チームがアラートを受け取ることができます。 さらに、失敗したテスト行自体にアラートを設定することもできます。

models:
- name: important_business_model
tests:
- some_custom_test:
config:
meta:
nr_config:
team: 'Upstream Business Team'
alert_failed_test_rows: true
failed_test_rows_limit: 10
slack_mentions: '@channel, @business_user1, @engineer1'
message: 'Important business process produced invalid data. Please check X tool'

トラブルシューティング

異なるバージョンの Airflow と異なるバージョンのプロバイダーを組み合わせると、重大な変更が発生する可能性があります。 場合によっては、Airflow 環境の特定のバージョンに合わせてコードを変更する必要があることがあります。 既知の問題はGithub リポジトリで追跡しています。