Our dbt Cloud integration with Airflow monitors the health of your dbt Cloud jobs and resources, helping you identify problems like when runs, models, or tests fail.
This integration runs on Apache Airflow and queries Snowflake for any failed tests if configured to do so.
Prerequisites
- dbt Cloud account with APIs enabled and using Snowflake as the database.
- Access to the Snowflake account where the dbt Cloud account runs.
- Existing Airflow environment version 2.8.1 or higher, or or ability to run Docker Compose.
Install the integration
You can install the New Relic dbt Cloud integration with Airflow by either:
- Installing in your existing Airflow environment. This is recommended for production environments.
- Installing with Docker Compose. This is suitable for quick POCs.
Select the option most suitable for your needs by clicking on its tab:
Ensure you have the Snowflake provider and then clone the newrelic-dbt-cloud-integration
repository by running these commands:
$pip install apache-airflow-providers-snowflake>=3.0.0
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Copy the contents of airflow/dags
to the root of your Airflow dags folder
Create the five Airflow connections needed for the DAG. The following table provides the connection name the info to set it up. Note that for all of these, the type is http
:
Connection name | Description | Type | Host and password | |
---|---|---|---|---|
| Allows you to connect to the dbt Cloud admin API with |
| Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ (Replace Password: Your dbt Cloud api token (Profile settings) or a service account token | |
| Allows you to connect to the dbt discovery API |
| Host: https://metadata.cloud.getdbt.com/graphql Password: Dbt Cloud service account token | |
| Allows you to upload custom events to New Relic |
| Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events (Replace Password: Your NR insights insert API key | |
| Allows you to query New Relic custom events |
| Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query (Replace Password: Your NR insights query API key |
Once you've configured the four above, you need to configure the Snowflake connection. Snowflake allows you to query for failed test rows. There are many ways to configure a snowflake connection. To configure using a private key pair, fill in the following attributes:
Type
: SnowflakeLogin
: Your Snowflake usernameAccount
: Your Snowflake accountWarehouse
: Your Snowflake warehouseRole
: Your Snowflake role. The role must have access to all the DBs used in dbt Cloud to get all failed test rows.Private Key Text
: The full private key used for this connection.Password
: Pass phrase for the private key if it's encrypted. Blank if it's unencrypted.
Complete the setup by enabling the new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG.
Run the following command to clone the newrelic-dbt-cloud-integration
repository:
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Then cd
into the Airflow directory:
$cd newrelic-dbt-cloud-integration/airflow
Then initialize and run Docker compose by running the following commands:
$docker-compose up airflow-init
$docker-compose up
Launch the Airflow UI: http://localhost:8080
Create the five Airflow connections needed for the DAG. The following table provides the connection name the info to set it up. Note that for all of these, the type is http
:
Connection name | Description | Type | Host and password | |
---|---|---|---|---|
| Allows you to connect to the dbt Cloud admin API with |
| Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ (Replace Password: Your dbt Cloud api token (Profile settings) or a service account token | |
| Allows you to connect to the dbt discovery API |
| Host: https://metadata.cloud.getdbt.com/graphql Password: Dbt Cloud service account token | |
| Allows you to upload custom events to New Relic |
| Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events (Replace Password: Your NR insights insert API key | |
| Allows you to query New Relic custom events |
| Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query (Replace Password: Your NR insights query API key |
Once you've configured the four above, you need to configure the Snowflake connection. Snowflake allows you to query for failed test rows. There are many ways to configure a snowflake connection. To configure using a private key pair, fill in the following attributes:
Type
: SnowflakeLogin
: Your Snowflake usernameAccount
: Your Snowflake accountWarehouse
: Your Snowflake warehouseRole
: Your Snowflake role. The role must have access to all the DBs used in dbt Cloud to get all failed test rows.Private Key Text
: The full private key used for this connection.Password
: Pass phrase for the private key if it's encrypted. Blank if it's unencrypted.
Complete the setup by enabling the new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG.
Find your data
This integration creates and reports three custom events to New Relic:
DAG configurations
Connections:
This DAG is intended to run as-is with no configuration. At the same time, we relize your company may have its own
naming conventions for connections. As such, we have a simple config inside dag_config.yml
where you can set the name
for the various connections.
connections: dbt_cloud_admin_api: dbt_cloud_admin_api dbt_cloud_discovery_api: dbt_cloud_discovery_api nr_insights_query: nr_insights_query nr_insights_insert: nr_insights_insert snowflake_api: SNOWFLAKE
Run Team:
dbt jobs might be owned by different teams, yet there is no place to set this within dbt Cloud. We can use Python code
to dynamically set the team. To write your own code, modify airflow/dags/nr_utils/nr_utils.py
and put any logic needed in
get_team_from_run()
. The run data passed in to that function has access to the following attributes.
- project_name
- environment_name
- All fields listed in the dbt Cloud v2 API for runs. All attributes are prepended with "run_"
Here is an example function:
def get_team_from_run(run: dict) -> str: team = 'Data Engineering' if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']: team = 'Platform' if re.match(r'Catch-all', run['job_name']): team = 'Project Catch All' return team
Dbt project configuration
Within the Dbt project, we can use the meta config to set an additional team and test-specific settings.
Team
: Whilerun_team determines
who owns the jobs, we sometimes need upstream or downstream teams to receive alert notifications on failed resources like tests and models. Setting the team helps us do that.alert_failed_test_rows
: Setting toTrue
will enable failded test rows where we run the queries for failed tests and send up to the first 10 columns to New Relicfailed_test_rows_limit
: Maximum number of failed test rows to send to New Relic. We have a hard-coded limit of 100 rows to prevent situations where we are sending unreasonable amounts to New Relic.slack_mentions
: If you enable slack alerts, this field allows you to set who should be mentioned in the message.
Setting this in dbt_project.yml
would set team to 'Data Engineering' and it would enable failed test rows.
models: dbt_fake_company: +meta: nr_config: team: 'Data Engineering' alert_failed_test_rows: False failed_test_rows_limit: 5 slack_mentions: '@channel, @business_users'
We can add another attributed called message to resources. In the following configuration, a partner business team can be alerted on specific failed tests. Furthermore, we can set alerts on the failed test rows themselves.
models: - name: important_business_model tests: - some_custom_test: config: meta: nr_config: team: 'Upstream Business Team' alert_failed_test_rows: true failed_test_rows_limit: 10 slack_mentions: '@channel, @business_user1, @engineer1' message: 'Important business process produced invalid data. Please check X tool'
Troubleshooting
Different versions of Airflow combined with different versions of providers can induce breaking changes. In some cases, you may need to modify code to match the specific versions in your Airflow environment. We track known issues in our Github repository.