Отправка логов Apache Airflow в Elasticsearch
Apr 9, 2020 07:49 · 344 words · 2 minute read
В одной из предыдущих статей мы рассматривали особенности развертывания Apache Airflow
в кластере Kubernetes
, а в данном материале поговорим об отправке логов из Airflow
в Elasticsearch
. Давайте разберемся!
Как мы уже знаем, сбор логов в Kubernetes
кластере и отправка их в Elasticsearch то еще удовольствие, поэтому при использовании Airflow
хотелось получить необходимый функционал “из коробки”. Но не тут то было:
Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.
То есть, отображать логи из эластика - запросто, а вот отправлять их в Elasticsearch
- без велосипеда никак!
В данном примере таким велосипедом будет связка Filebeat и знакомые нам elastic pipelines.
По сравнению с примерами из предыдущей статьи об Airflow
, нам потребуется изменить:
- Dockerfile (добавив в него установку
filebeat
); - entrypoint-скрипт (перед запуском самого
airflow
будем стартоватьfilebeat
- не лучшая практика работы с docker-образами, но что ж поделать); - конфигмап для
airflow
(появится новая секция в основном конфиге и дополнительный файл с настройкамиfilebeat
).
Считаем, что у вас также установлен и настроен Elasticsearch
и у вас есть все необходимое (адрес/логин/пароль/сертификат) для подключения к нему.
После внесенных изменения наш Dockerfile превратится в следующий:
Entrypoint-скрипт будет выглядеть так:
Настройки для filebeat
в нашем случае будут (специально опубликую их отдельно):
А манифест для Kubernetes
кластера, вместе со всеми необходимыми объектами (сервис/деплоймент/секрет/измененный конфигмап) станет таким:
Elastic-пайплайн выглядит так:
Данный пайплайн добавляется с помощью следующей команды:
curl -k -s -XPUT -u ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD} -H "Content-Type: application/json" -d @airflow-pipeline.json "https://${ELASTICSEARCH_URLS}:9200/_ingest/pipeline/airflow-filebeat-pipeline"
После применения предложенной конфигурации логи запускаемых DAG’ов будут посредством filebeat
отправляться в Elasticsearch
, а благодаря новым секциям
[elasticsearch]
host = https://elasticUser:elasticPaasword@elasticHost:9200
log_id_template = {dag_id}|{task_id}|{execution_date}|{try_number}
end_of_log_mark = end_of_log
frontend =
write_stdout = True
json_format = True
json_fields = asctime, filename, lineno, levelname, message
[elasticsearch_configs]
use_ssl = True
verify_certs = True
ca_certs = /es/certs/ca.pem
указанным в конфигурационном файле airflow.cfg эти самые логи можно будет увидеть и в UI самого Airflow
.
На этом все, свои способы отправки логов из Airflow
в Elasticsearch
оставляйте в комментариях!