Интеграция Apache Airflow и Slack для отправки уведомлений
Jul 12, 2021 10:26 · 399 words · 2 minute read
Ранее мы уже рассматривали процесс развертывания Apache Airflow
(далее Airflow
) в кластере Kubernetes
и запуск задач с помощью API. Как и обещал, в данной статье на конкретном примере я покажу как настроить отправку уведомлений о статусе задач Airflow
в Slack
. Давайте разберемся!
Отправка уведомлений в Slack
, наряду с уведомлениями по электронной почте, является довольно распространенным вариантом оповещений о статусе задач Airflow
. Существует несколько возможных вариантов настройки интеграции Apache Airflow
и Slack
, в данной статье мы будем рассматривать SlackWebhookOperator - рекомендованный самим Slack
‘ом вариант для отправки сообщений из приложений.
Считаем, что необходимые подготовительные шаги уже выполнены, а именно:
- в рабочем окружении (workspace)
Slack
создано приложение и вебхук (подробнее в официальной документации); - в вашем экземпляре
Airflow
-сервера установлен пакет slack:
...
pip3 install 'apache-airflow[slack]'
...
- создано подключение (connection) к
Slack
в настройкахAirflow
- это может быть сделано как руками в web-интерфейсе, так и с помощью скрипта при стартеAirflow
:
...
airflow connections \
--add \
--conn_id slack \
--conn_type http \
--conn_host https://hooks.slack.com/services \
--conn_password ${AIRFLOW_SLACK_WEBHOOK_URL}
...
Как правило, в контексте интергации Apache Airflow
и Slack
, рассматривают два типа оповещений через Python функции - on_failure_callback
и on_success_callback
. Эти функции могут быть определены на уровне DAG’а или задачи и вызываться в случае неуспешного / успешного завершения задачи соответственно. Примеров таких функций предостаточно на просторах интернета, поэтому хотелось бы больше сконцентрироваться на SLA (Service Level Agreement) типе оповещения - такое уведомление может быть получено если ваша задача выполняется дольше ожидаемого времени.
Для начала создадим отдельный файл notifications.py
, в котором определим необходимые нам функции для последующего использования:
Здесь:
SLACK_CONN_ID
соответствует ранее созданому подключению;- функция
get_users
вернет список получателей по умолчанию, если в самом DAG’е не будет указан получатель (как правило, владелец данного DAG’а); - функции
task_fail_slack_alert
,task_success_slack_alert
иtask_sla_missed_slack_alert
формируют и форматируют соответствующее сообщение, после чего передают его в качестве параметра функцииsend_slack_alert
; - функция
send_slack_alert
непосредственно отправляет уведомление в указанныйSlack
-канал.
Пример использования всего вышеперечисленного в описании DAG будет выглядеть примерно так:
В данном примере:
- определяем SLA для нашей задачи в 30 секунд:
'sla': timedelta(seconds=30),
- параметрах DAG указываем что делать при превышении указанного ранее SLA:
sla_miss_callback=partial(task_sla_missed_slack_alert, usr="ealebed"),
- сама задача в данном примере будет выполняться не менее двух минут, о чем говорит следующая строка:
cmds=["sleep", "120"]
Примечание. Также может оказаться довольно полезным закоментированный в данном примере параметр dagrun_timeout
- по истечении указанного в данном параметре времени задача будет принудительно завершена с кодом завершения отличным от 0.
На этом все, и побольше вам on_success_callback
в Airflow
задачах!