Интеграция Apache Airflow и Slack для отправки уведомлений
Jul 12, 2021 10:26 · 399 words · 2 minute read
Ранее мы уже рассматривали процесс развертывания Apache Airflow (далее Airflow) в кластере Kubernetes и запуск задач с помощью API. Как и обещал, в данной статье на конкретном примере я покажу как настроить отправку уведомлений о статусе задач Airflow в Slack. Давайте разберемся!
Отправка уведомлений в Slack, наряду с уведомлениями по электронной почте, является довольно распространенным вариантом оповещений о статусе задач Airflow. Существует несколько возможных вариантов настройки интеграции Apache Airflow и Slack, в данной статье мы будем рассматривать SlackWebhookOperator - рекомендованный самим Slack‘ом вариант для отправки сообщений из приложений.
Считаем, что необходимые подготовительные шаги уже выполнены, а именно:
- в рабочем окружении (workspace)
Slackсоздано приложение и вебхук (подробнее в официальной документации); - в вашем экземпляре
Airflow-сервера установлен пакет slack:
...
pip3 install 'apache-airflow[slack]'
...
- создано подключение (connection) к
Slackв настройкахAirflow- это может быть сделано как руками в web-интерфейсе, так и с помощью скрипта при стартеAirflow:
...
airflow connections \
--add \
--conn_id slack \
--conn_type http \
--conn_host https://hooks.slack.com/services \
--conn_password ${AIRFLOW_SLACK_WEBHOOK_URL}
...
Как правило, в контексте интергации Apache Airflow и Slack, рассматривают два типа оповещений через Python функции - on_failure_callback и on_success_callback. Эти функции могут быть определены на уровне DAG’а или задачи и вызываться в случае неуспешного / успешного завершения задачи соответственно. Примеров таких функций предостаточно на просторах интернета, поэтому хотелось бы больше сконцентрироваться на SLA (Service Level Agreement) типе оповещения - такое уведомление может быть получено если ваша задача выполняется дольше ожидаемого времени.
Для начала создадим отдельный файл notifications.py, в котором определим необходимые нам функции для последующего использования:
Здесь:
SLACK_CONN_IDсоответствует ранее созданому подключению;- функция
get_usersвернет список получателей по умолчанию, если в самом DAG’е не будет указан получатель (как правило, владелец данного DAG’а); - функции
task_fail_slack_alert,task_success_slack_alertиtask_sla_missed_slack_alertформируют и форматируют соответствующее сообщение, после чего передают его в качестве параметра функцииsend_slack_alert; - функция
send_slack_alertнепосредственно отправляет уведомление в указанныйSlack-канал.
Пример использования всего вышеперечисленного в описании DAG будет выглядеть примерно так:
В данном примере:
- определяем SLA для нашей задачи в 30 секунд:
'sla': timedelta(seconds=30),
- параметрах DAG указываем что делать при превышении указанного ранее SLA:
sla_miss_callback=partial(task_sla_missed_slack_alert, usr="ealebed"),
- сама задача в данном примере будет выполняться не менее двух минут, о чем говорит следующая строка:
cmds=["sleep", "120"]
Примечание. Также может оказаться довольно полезным закоментированный в данном примере параметр dagrun_timeout - по истечении указанного в данном параметре времени задача будет принудительно завершена с кодом завершения отличным от 0.
На этом все, и побольше вам on_success_callback в Airflow задачах!