Nossa dbt Cloud integração com Airflow monitora a integridade de seus dbt Cloud jobs e recursos, ajudando você a identificar problemas como quando execuções, modelos ou testes falham.
Essa integração é executada em Apache Airflow e consulta Snowflake em busca de testes com falha, se configurada para isso.
Pré-requisitos
- Conta dbt Cloud com API habilitada e usando Snowflake como banco de dados.
- Acesso à conta Snowflake onde a conta dbt Cloud é executada.
- Ambiente Airflow existente versão 2.8.1 ou superior ou capacidade de executar Docker Compose.
Instale a integração
Você pode instalar a integração New Relic dbt Cloud com Airflow :
- Instalando em seu ambiente Airflow existente. Isto é recomendado para ambiente de produção.
- Instalando com docker Compose. Isso é adequado para POCs rápidos.
Selecione a opção mais adequada às suas necessidades clicando em sua aba:
Certifique-se de ter o provedor Snowflake e clone o repositório newrelic-dbt-cloud-integration
executando estes comandos:
$pip install apache-airflow-providers-snowflake>=3.0.0
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Copie o conteúdo de airflow/dags
para a raiz da pasta dags do Airflow
Crie as cinco conexões Airflow necessárias para o DAG. A tabela a seguir fornece o nome da conexão e as informações para configurá-la. Observe que para todos eles, o tipo é http
:
Nome da conexão | Descrição | Tipo | Host e senha | |
---|---|---|---|---|
| Permite que você se conecte à API de administração do dbt Cloud com |
| Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ (Substitua Senha: Seu token de API do dbt Cloud (configurações de perfil) ou um token de conta de serviço | |
| Permite que você se conecte à API de descoberta dbt |
| ||
| Permite fazer upload de eventos personalizados para New Relic |
| Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events (Substitua Senha: Sua chave de API de inserção de insights do NR | |
| Permite consultar evento New Relic personalizado |
| Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query (Substitua Senha: Sua chave de API de consulta de insights do NR |
Depois de configurar os quatro acima, você precisa configurar a conexão Snowflake. Snowflake permite que você consulte linhas de teste com falha. Há muitas maneiras de configurar uma conexão em floco de neve. Para configurar usando um par de chaves privadas, preencha o seguinte atributo:
Type
: Floco de neveLogin
: Seu nome de usuário do floco de neveAccount
: Sua conta do floco de neveWarehouse
: Seu armazém SnowflakeRole
: Seu papel de floco de neve. A função deve ter acesso a todos os bancos de dados usados no dbt Cloud para obter todas as linhas de teste com falha.Private Key Text
: a chave privada completa usada para esta conexão.Password
: frase secreta para a chave privada se ela estiver criptografada. Em branco se não estiver criptografado.
Conclua a configuração ativando o new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG.
Execute o seguinte comando para clonar o repositório newrelic-dbt-cloud-integration
:
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Em seguida, cd
no diretório do Airflow:
$cd newrelic-dbt-cloud-integration/airflow
Em seguida, inicialize e execute docker compose executando os seguintes comandos:
$docker-compose up airflow-init
$docker-compose up
lançar a interface Airflow: http://localhost:8080
Crie as cinco conexões Airflow necessárias para o DAG. A tabela a seguir fornece o nome da conexão e as informações para configurá-la. Observe que para todos eles, o tipo é http
:
Nome da conexão | Descrição | Tipo | Host e senha | |
---|---|---|---|---|
| Permite que você se conecte à API de administração do dbt Cloud com |
| Host: https://cloud.getdbt.com/api/v2/accounts/ACCOUNT\_ID/ (Substitua Senha: Seu token de API do dbt Cloud (configurações de perfil) ou um token de conta de serviço | |
| Permite que você se conecte à API de descoberta dbt |
| ||
| Permite fazer upload de eventos personalizados para New Relic |
| Host: https://insights-collector.newrelic.com/v1/accounts/ACCOUNT\_ID/events (Substitua Senha: Sua chave de API de inserção de insights do NR | |
| Permite consultar evento New Relic personalizado |
| Host: https://insights-api.newrelic.com/v1/accounts/ACCOUNT\_ID/query (Substitua Senha: Sua chave de API de consulta de insights do NR |
Depois de configurar os quatro acima, você precisa configurar a conexão Snowflake. Snowflake permite que você consulte linhas de teste com falha. Há muitas maneiras de configurar uma conexão em floco de neve. Para configurar usando um par de chaves privadas, preencha o seguinte atributo:
Type
: Floco de neveLogin
: Seu nome de usuário do floco de neveAccount
: Sua conta do floco de neveWarehouse
: Seu armazém SnowflakeRole
: Seu papel de floco de neve. A função deve ter acesso a todos os bancos de dados usados no dbt Cloud para obter todas as linhas de teste com falha.Private Key Text
: a chave privada completa usada para esta conexão.Password
: frase secreta para a chave privada se ela estiver criptografada. Em branco se não estiver criptografado.
Conclua a configuração ativando o new_relic_data_pipeline_observability_get_dbt_run_metadata2
DAG.
Encontre seus dados
Esta integração cria e reporta três eventos personalizados para New Relic:
Configuração DAG
Conexões:
Este DAG foi projetado para ser executado como está, sem configuração. Ao mesmo tempo, sabemos que sua empresa pode ter suas próprias convenções de nomenclatura para conexões. Assim, temos uma configuração simples dentro de dag_config.yml
onde você pode definir o nome das diversas conexões.
connections: dbt_cloud_admin_api: dbt_cloud_admin_api dbt_cloud_discovery_api: dbt_cloud_discovery_api nr_insights_query: nr_insights_query nr_insights_insert: nr_insights_insert snowflake_api: SNOWFLAKE
Equipe de execução:
Os trabalhos do dbt podem pertencer a equipes diferentes, mas não há lugar para definir isso no dbt Cloud. Podemos usar o código Python para definir a equipe dinamicamente. Para escrever seu próprio código, modifique airflow/dags/nr_utils/nr_utils.py
e coloque qualquer lógica necessária em get_team_from_run()
. Os dados de execução passados para essa função têm acesso ao seguinte atributo.
- Nome do Projeto
- nome_do_ambiente
- Todos os campos listados na API dbt Cloud v2 para execuções. Todos os atributos são prefixados com "run_"
Aqui está um exemplo de função:
def get_team_from_run(run: dict) -> str: team = 'Data Engineering' if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']: team = 'Platform' if re.match(r'Catch-all', run['job_name']): team = 'Project Catch All' return team
Configuração do projeto Dbt
Dentro do projeto Dbt, podemos usar a metaconfiguração para definir uma equipe adicional e configurações específicas de teste.
Team
: emborarun_team determines
seja o proprietário dos trabalhos, às vezes precisamos que equipes upstream ou downstream recebam notificações de alerta sobre recursos com falha, como testes e modelos. Definir a equipe nos ajuda a fazer isso.alert_failed_test_rows
: Definir comoTrue
ativará linhas de teste com falha onde executamos a consulta para testes com falha e enviamos até as 10 primeiras colunas para New Relicfailed_test_rows_limit
: número máximo de linhas de teste com falha para enviar ao New Relic. Temos um limite codificado de 100 linhas para evitar situações em que enviamos quantias excessivas para a New Relic.slack_mentions
: se você ativar alertas de folga, este campo permite definir quem deve ser mencionado na mensagem.
Definir isso em dbt_project.yml
definiria a equipe como 'Engenharia de Dados' e ativaria linhas de teste com falha.
models: dbt_fake_company: +meta: nr_config: team: 'Data Engineering' alert_failed_test_rows: False failed_test_rows_limit: 5 slack_mentions: '@channel, @business_users'
Podemos adicionar outro atributo chamado mensagem aos recursos. Na configuração a seguir, uma equipe de negócios parceira pode receber alertas sobre testes específicos que falharam. Além disso, podemos definir alertas nas próprias linhas de teste com falha.
models: - name: important_business_model tests: - some_custom_test: config: meta: nr_config: team: 'Upstream Business Team' alert_failed_test_rows: true failed_test_rows_limit: 10 slack_mentions: '@channel, @business_user1, @engineer1' message: 'Important business process produced invalid data. Please check X tool'
Resolução de problemas
Diferentes versões do Airflow combinadas com diferentes versões de provedores podem induzir alterações significativas. Em alguns casos, pode ser necessário modificar o código para corresponder às versões específicas do seu ambiente Airflow. Rastreamos problemas conhecidos em nosso repositório Github.