Интеграция Apache Airflow и Slack для отправки уведомлений

Jul 12, 2021 10:26 · 399 words · 2 minute read airflow slack python

Ранее мы уже рассматривали процесс развертывания Apache Airflow (далее Airflow) в кластере Kubernetes и запуск задач с помощью API. Как и обещал, в данной статье на конкретном примере я покажу как настроить отправку уведомлений о статусе задач Airflow в Slack. Давайте разберемся!

Отправка уведомлений в Slack, наряду с уведомлениями по электронной почте, является довольно распространенным вариантом оповещений о статусе задач Airflow. Существует несколько возможных вариантов настройки интеграции Apache Airflow и Slack, в данной статье мы будем рассматривать SlackWebhookOperator - рекомендованный самим Slack‘ом вариант для отправки сообщений из приложений.

Считаем, что необходимые подготовительные шаги уже выполнены, а именно:

  • в рабочем окружении (workspace) Slack создано приложение и вебхук (подробнее в официальной документации);
  • в вашем экземпляре Airflow-сервера установлен пакет slack:
...
pip3 install 'apache-airflow[slack]'
...
  • создано подключение (connection) к Slack в настройках Airflow - это может быть сделано как руками в web-интерфейсе, так и с помощью скрипта при старте Airflow:
...
    airflow connections \
      --add \
      --conn_id slack \
      --conn_type http \
      --conn_host https://hooks.slack.com/services \
      --conn_password ${AIRFLOW_SLACK_WEBHOOK_URL}
...

Как правило, в контексте интергации Apache Airflow и Slack, рассматривают два типа оповещений через Python функции - on_failure_callback и on_success_callback. Эти функции могут быть определены на уровне DAG’а или задачи и вызываться в случае неуспешного / успешного завершения задачи соответственно. Примеров таких функций предостаточно на просторах интернета, поэтому хотелось бы больше сконцентрироваться на SLA (Service Level Agreement) типе оповещения - такое уведомление может быть получено если ваша задача выполняется дольше ожидаемого времени.

Для начала создадим отдельный файл notifications.py, в котором определим необходимые нам функции для последующего использования:

Здесь:

  • SLACK_CONN_ID соответствует ранее созданому подключению;
  • функция get_users вернет список получателей по умолчанию, если в самом DAG’е не будет указан получатель (как правило, владелец данного DAG’а);
  • функции task_fail_slack_alert, task_success_slack_alert и task_sla_missed_slack_alert формируют и форматируют соответствующее сообщение, после чего передают его в качестве параметра функции send_slack_alert;
  • функция send_slack_alert непосредственно отправляет уведомление в указанный Slack-канал.

Пример использования всего вышеперечисленного в описании DAG будет выглядеть примерно так:

В данном примере:

  • определяем SLA для нашей задачи в 30 секунд:
'sla': timedelta(seconds=30),
  • параметрах DAG указываем что делать при превышении указанного ранее SLA:
sla_miss_callback=partial(task_sla_missed_slack_alert, usr="ealebed"),
  • сама задача в данном примере будет выполняться не менее двух минут, о чем говорит следующая строка:
cmds=["sleep", "120"]

Примечание. Также может оказаться довольно полезным закоментированный в данном примере параметр dagrun_timeout - по истечении указанного в данном параметре времени задача будет принудительно завершена с кодом завершения отличным от 0.

На этом все, и побольше вам on_success_callback в Airflow задачах!

tweet Share