dbt Cloud with Airflow and Snowflake integration

Our dbt Cloud integration with Airflow monitors the health of your dbt Cloud jobs and resources, helping you identify problems like when runs, models, or tests fail.

This integration runs on Apache Airflow and queries Snowflake for any failed tests if configured to do so.

Prerequisites

  • dbt Cloud account with APIs enabled and using Snowflake as the database.
  • Access to the Snowflake account where the dbt Cloud account runs.
  • Existing Airflow environment version 2.8.1 or higher, or or ability to run Docker Compose.

Install the integration

You can install the New Relic dbt Cloud integration with Airflow by either:

  • Installing in your existing Airflow environment. This is recommended for production environments.
  • Installing with Docker Compose. This is suitable for quick POCs.

Select the option most suitable for your needs by clicking on its tab:

Ensure you have the Snowflake provider and then clone the newrelic-dbt-cloud-integration repository by running these commands:

bash
$
pip install apache-airflow-providers-snowflake>=3.0.0
bash
$
git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git

Copy the contents of airflow/dags to the root of your Airflow dags folder

Create the five Airflow connections needed for the DAG. The following table provides the connection name the info to set it up. Note that for all of these, the type is http:

Connection name

Description

Type

Host and password

dbt_cloud_admin_api

Allows you to connect to the dbt Cloud admin API with SimpleHttpHook

http

Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ (Replace ACCOUNT_ID with your dbt Cloud account id)

Password: Your dbt Cloud api token (Profile settings) or a service account token

dbt_cloud_discovery_api

Allows you to connect to the dbt discovery API

http

Host: https://metadata.cloud.getdbt.com/graphql

Password: Dbt Cloud service account token

nr_insights_insert

Allows you to upload custom events to New Relic

http

Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events (Replace ACCOUNT_ID with your account id)

Password: Your NR insights insert API key

nr_insights_query

Allows you to query New Relic custom events

http

Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query (Replace ACCOUNT_ID with your account id)

Password: Your NR insights query API key

Once you've configured the four above, you need to configure the Snowflake connection. Snowflake allows you to query for failed test rows. There are many ways to configure a snowflake connection. To configure using a private key pair, fill in the following attributes:

  • Type: Snowflake
  • Login: Your Snowflake username
  • Account: Your Snowflake account
  • Warehouse: Your Snowflake warehouse
  • Role: Your Snowflake role. The role must have access to all the DBs used in dbt Cloud to get all failed test rows.
  • Private Key Text: The full private key used for this connection.
  • Password: Pass phrase for the private key if it's encrypted. Blank if it's unencrypted.

Complete the setup by enabling the new_relic_data_pipeline_observability_get_dbt_run_metadata2 DAG.

Run the following command to clone the newrelic-dbt-cloud-integration repository:

bash
$
git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git

Then cd into the Airflow directory:

bash
$
cd newrelic-dbt-cloud-integration/airflow

Then initialize and run Docker compose by running the following commands:

bash
$
docker-compose up airflow-init
bash
$
docker-compose up

Launch the Airflow UI: http://localhost:8080

Create the five Airflow connections needed for the DAG. The following table provides the connection name the info to set it up. Note that for all of these, the type is http:

Connection name

Description

Type

Host and password

dbt_cloud_admin_api

Allows you to connect to the dbt Cloud admin API with SimpleHttpHook

http

Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ (Replace ACCOUNT_ID with your dbt Cloud account id)

Password: Your dbt Cloud api token (Profile settings) or a service account token

dbt_cloud_discovery_api

Allows you to connect to the dbt discovery API

http

Host: https://metadata.cloud.getdbt.com/graphql

Password: Dbt Cloud service account token

nr_insights_insert

Allows you to upload custom events to New Relic

http

Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events (Replace ACCOUNT_ID with your account id)

Password: Your NR insights insert API key

nr_insights_query

Allows you to query New Relic custom events

http

Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query (Replace ACCOUNT_ID with your account id)

Password: Your NR insights query API key

Once you've configured the four above, you need to configure the Snowflake connection. Snowflake allows you to query for failed test rows. There are many ways to configure a snowflake connection. To configure using a private key pair, fill in the following attributes:

  • Type: Snowflake
  • Login: Your Snowflake username
  • Account: Your Snowflake account
  • Warehouse: Your Snowflake warehouse
  • Role: Your Snowflake role. The role must have access to all the DBs used in dbt Cloud to get all failed test rows.
  • Private Key Text: The full private key used for this connection.
  • Password: Pass phrase for the private key if it's encrypted. Blank if it's unencrypted.

Complete the setup by enabling the new_relic_data_pipeline_observability_get_dbt_run_metadata2 DAG.

Find your data

This integration creates and reports three custom events to New Relic:

DAG configurations

Connections:

This DAG is intended to run as-is with no configuration. At the same time, we relize your company may have its own naming conventions for connections. As such, we have a simple config inside dag_config.yml where you can set the name for the various connections.

connections:
dbt_cloud_admin_api: dbt_cloud_admin_api
dbt_cloud_discovery_api: dbt_cloud_discovery_api
nr_insights_query: nr_insights_query
nr_insights_insert: nr_insights_insert
snowflake_api: SNOWFLAKE

Run Team:

dbt jobs might be owned by different teams, yet there is no place to set this within dbt Cloud. We can use Python code to dynamically set the team. To write your own code, modify airflow/dags/nr_utils/nr_utils.py and put any logic needed in get_team_from_run(). The run data passed in to that function has access to the following attributes.

Here is an example function:

def get_team_from_run(run: dict) -> str:
team = 'Data Engineering'
if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']:
team = 'Platform'
if re.match(r'Catch-all', run['job_name']):
team = 'Project Catch All'
return team

Dbt project configuration

Within the Dbt project, we can use the meta config to set an additional team and test-specific settings.

  • Team: While run_team determines who owns the jobs, we sometimes need upstream or downstream teams to receive alert notifications on failed resources like tests and models. Setting the team helps us do that.
  • alert_failed_test_rows: Setting to True will enable failded test rows where we run the queries for failed tests and send up to the first 10 columns to New Relic
  • failed_test_rows_limit: Maximum number of failed test rows to send to New Relic. We have a hard-coded limit of 100 rows to prevent situations where we are sending unreasonable amounts to New Relic.
  • slack_mentions: If you enable slack alerts, this field allows you to set who should be mentioned in the message.

Setting this in dbt_project.yml would set team to 'Data Engineering' and it would enable failed test rows.

models:
dbt_fake_company:
+meta:
nr_config:
team: 'Data Engineering'
alert_failed_test_rows: False
failed_test_rows_limit: 5
slack_mentions: '@channel, @business_users'

We can add another attributed called message to resources. In the following configuration, a partner business team can be alerted on specific failed tests. Furthermore, we can set alerts on the failed test rows themselves.

models:
- name: important_business_model
tests:
- some_custom_test:
config:
meta:
nr_config:
team: 'Upstream Business Team'
alert_failed_test_rows: true
failed_test_rows_limit: 10
slack_mentions: '@channel, @business_user1, @engineer1'
message: 'Important business process produced invalid data. Please check X tool'

Troubleshooting

Different versions of Airflow combined with different versions of providers can induce breaking changes. In some cases, you may need to modify code to match the specific versions in your Airflow environment. We track known issues in our Github repository.