Airflow 과의 dbt Cloud 통합은 dbt Cloud 작업 및 리소스의 상태를 모니터링하여 실행, 모델 또는 테스트 실패와 같은 문제를 식별하는 데 도움이 됩니다.
이 통합은 Apache Airflow 에서 실행되며 그렇게 하도록 구성된 경우 실패한 테스트에 대해 Snowflake (를) 쿼리합니다.
전제 조건
API가 활성화되고
Snowflake
데이터베이스로 사용하는 dbt Cloud 계정입니다.
dbt Cloud
계정이 실행되는
Snowflake
계정에 액세스합니다.
기존
Airflow
환경 버전 2.8.1 이상 또는
Docker Compose
실행 기능.
통합 설치
다음 중 한 가지 방법으로 Airflow (를) 사용하여 뉴렐릭 dbt Cloud 통합을 설치할 수 있습니다.
- 기존 Airflow 환경에 설치합니다. 이는 운영 환경에 권장됩니다.
- docker Compose를 사용하여 설치. 이는 빠른 POC에 적합합니다.
해당 탭을 클릭하여 귀하의 요구에 가장 적합한 옵션을 선택하십시오:
Snowflake 공급자가 있는지 확인한 후 다음 명령을 실행하여 newrelic-dbt-cloud-integration
저장소를 복제합니다.
$pip install apache-airflow-providers-snowflake>=3.0.0
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
airflow/dags
의 콘텐츠를 Airflow dags 폴더의 루트에 복사합니다.
DAG에 필요한 5개의 Airflow 연결을 만듭니다. 다음 표에는 연결 이름과 설정 정보가 나와 있습니다.
연결 이름 | 설명 | 유형 | 주인 | 비밀번호 |
---|---|---|---|---|
| 다음을 사용하여 dbt Cloud 관리 API에 연결할 수 있습니다. |
| https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ ( | |
| dbt 검색 API에 연결할 수 있습니다. |
| ||
| 커스텀 대시보드를 뉴렐릭에 업로드할 수 있습니다. |
| https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events ( | |
| 쿼리 쿠렐릭 맞춤형 대시보드를 사용할 수 있습니다. |
| https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query ( |
위의 네 가지를 구성한 후에는 Snowflake 연결을 구성해야 합니다. Snowflake를 사용하면 실패한 테스트 행을 쿼리할 수 있습니다. 눈송이 연결을 구성하는 방법에는 여러 가지 가 있습니다. 개인 키 쌍을 사용하여 구성하려면 다음 속성을 입력하십시오.
Type
: 눈송이Login
: 귀하의 Snowflake 사용자 이름Account
: 귀하의 Snowflake 계정Warehouse
: 당신의 눈송이 창고Role
: 귀하의 Snowflake 역할입니다. 실패한 모든 테스트 행을 가져오려면 역할에 dbt Cloud에서 사용되는 모든 DB에 대한 액세스 권한이 있어야 합니다.Private Key Text
: 이 연결에 사용되는 전체 개인 키입니다.Password
: 암호화된 경우 개인 키에 대한 암호 문구입니다. 암호화되지 않은 경우 비어 있습니다.
new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG를 활성화하여 설정을 완료합니다.
다음 명령어를 실행하여 newrelic-dbt-cloud-integration
저장소를 클론합니다.
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
그런 다음 Airflow 디렉터리로 cd
.
$cd newrelic-dbt-cloud-integration/airflow
그런 다음 다음 명령을 실행하여 docker compose를 초기화하고 실행합니다.
$docker-compose up airflow-init
$docker-compose up
Airflow UI: http://localhost:8080
DAG에 필요한 5개의 Airflow 연결을 만듭니다. 다음 표에는 연결 이름과 설정 정보가 나와 있습니다.
연결 이름 | 설명 | 유형 | 주인 | 비밀번호 |
---|---|---|---|---|
| 다음을 사용하여 dbt Cloud 관리 API에 연결할 수 있습니다. |
| https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ ( | |
| dbt 검색 API에 연결할 수 있습니다. |
| ||
| 커스텀 대시보드를 뉴렐릭에 업로드할 수 있습니다. |
| https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events ( | |
| 쿼리 쿠렐릭 맞춤형 대시보드를 사용할 수 있습니다. |
| https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query ( |
위의 네 가지를 구성한 후에는 Snowflake 연결을 구성해야 합니다. Snowflake를 사용하면 실패한 테스트 행을 쿼리할 수 있습니다. 눈송이 연결을 구성하는 방법에는 여러 가지 가 있습니다. 개인 키 쌍을 사용하여 구성하려면 다음 속성을 입력하십시오.
Type
: 눈송이Login
: 귀하의 Snowflake 사용자 이름Account
: 귀하의 Snowflake 계정Warehouse
: 당신의 눈송이 창고Role
: 귀하의 Snowflake 역할입니다. 실패한 모든 테스트 행을 가져오려면 역할에 dbt Cloud에서 사용되는 모든 DB에 대한 액세스 권한이 있어야 합니다.Private Key Text
: 이 연결에 사용되는 전체 개인 키입니다.Password
: 암호화된 경우 개인 키에 대한 암호 문구입니다. 암호화되지 않은 경우 비어 있습니다.
new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG를 활성화하여 설정을 완료합니다.
데이터 찾기
이 통합은 세 가지 사용자 정의 대시보드를 생성하고 뉴렐릭에 보고합니다.
DAG 설정
사이:
이 DAG는 설정 없이 있는 그대로 실행되도록 만들어졌습니다. 동시에, 귀하의 회사에는 연결에 대한 고유한 명명 규칙이 있을 수 있다는 것을 알고 있습니다. 따라서 dag_config.yml
내부에는 다양한 연결의 이름을 설정할 수 있는 간단한 구성이 있습니다.
connections: dbt_cloud_admin_api: dbt_cloud_admin_api dbt_cloud_discovery_api: dbt_cloud_discovery_api nr_insights_query: nr_insights_query nr_insights_insert: nr_insights_insert snowflake_api: SNOWFLAKE
팀 실행:
dbt 작업은 다른 팀이 소유할 수 있지만 dbt Cloud 내에서는 이를 설정할 수 있는 장소가 없습니다. Python 코드를 사용하여 팀을 동적으로 설정할 수 있습니다. 자신만의 코드를 작성하려면 airflow/dags/nr_utils/nr_utils.py
수정하고 get_team_from_run()
에 필요한 로직을 넣으세요. 해당 함수에 전달된 실행 데이터는 다음 속성에 액세스할 수 있습니다.
- 프로젝트_이름
- 환경_이름
- 실행을 위한 dbt Cloud v2 API 에 나열된 모든 필드입니다. 모든 속성 앞에는 "run_"이 붙습니다.
다음은 예제 함수입니다:
def get_team_from_run(run: dict) -> str: team = 'Data Engineering' if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']: team = 'Platform' if re.match(r'Catch-all', run['job_name']): team = 'Project Catch All' return team
Dbt 프로젝트 설정
Dbt 프로젝트 내에서 메타 구성을 사용하여 추가 팀 및 테스트별 설정을 지정할 수 있습니다.
Team
: 작업을 소유한run_team determines
동안 테스트 및 모델과 같은 실패한 리소스에 대한 공지 공지를 받기 위해 업스트림 또는 다운스트림 팀이 필요한 경우가 있습니다. 팀을 구성하는 것은 우리가 그렇게 하는 데 도움이 됩니다.alert_failed_test_rows
:True
로 설정하면 실패한 테스트에 대한 쿼리를 실행하고 최대 처음 10개 열을 뉴렐릭으로 보내는 실패한 테스트 행이 활성화됩니다.failed_test_rows_limit
: 뉴렐릭으로 보낼 실패한 테스트 행의 최대 개수입니다. 뉴렐릭에 부당한 금액을 보내는 상황을 방지하기 위해 행 100개로 하드 코딩된 제한이 있습니다.slack_mentions
: Slack 알림을 활성화한 경우 이 필드를 사용하면 메시지에서 언급할 사람을 설정할 수 있습니다.
dbt_project.yml
에서 이를 설정하면 팀이 '데이터 엔지니어링'으로 설정되고 실패한 테스트 행이 활성화됩니다.
models: dbt_fake_company: +meta: nr_config: team: 'Data Engineering' alert_failed_test_rows: False failed_test_rows_limit: 5 slack_mentions: '@channel, @business_users'
메시지라는 또 다른 속성을 리소스에 추가할 수 있습니다. 다음 설정에서는 실패한 특정 테스트에 대해 파트너 비즈니스 팀에 알림을 보낼 수 있습니다. 또한 실패한 테스트 행 자체에 대한 알림을 설정할 수도 있습니다.
models: - name: important_business_model tests: - some_custom_test: config: meta: nr_config: team: 'Upstream Business Team' alert_failed_test_rows: true failed_test_rows_limit: 10 slack_mentions: '@channel, @business_user1, @engineer1' message: 'Important business process produced invalid data. Please check X tool'
문제점 해결
다양한 버전의 Airflow와 다양한 버전의 공급자를 결합하면 주요 변경 사항이 발생할 수 있습니다. 경우에 따라 Airflow 환경의 특정 버전과 일치하도록 코드를 수정해야 할 수도 있습니다. 우리는 Github 저장소 에서 알려진 문제를 추적합니다.