Отправка логов Apache Airflow в Elasticsearch

Apr 9, 2020 07:49 · 344 words · 2 minute read airflow docker kubernetes elasticsearch logs

В одной из предыдущих статей мы рассматривали особенности развертывания Apache Airflow в кластере Kubernetes, а в данном материале поговорим об отправке логов из Airflow в Elasticsearch. Давайте разберемся!

Как мы уже знаем, сбор логов в Kubernetes кластере и отправка их в Elasticsearch то еще удовольствие, поэтому при использовании Airflow хотелось получить необходимый функционал “из коробки”. Но не тут то было:

Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.

То есть, отображать логи из эластика - запросто, а вот отправлять их в Elasticsearch - без велосипеда никак!

В данном примере таким велосипедом будет связка Filebeat и знакомые нам elastic pipelines.

По сравнению с примерами из предыдущей статьи об Airflow, нам потребуется изменить:

  • Dockerfile (добавив в него установку filebeat);
  • entrypoint-скрипт (перед запуском самого airflow будем стартовать filebeat - не лучшая практика работы с docker-образами, но что ж поделать);
  • конфигмап для airflow (появится новая секция в основном конфиге и дополнительный файл с настройками filebeat).

Считаем, что у вас также установлен и настроен Elasticsearch и у вас есть все необходимое (адрес/логин/пароль/сертификат) для подключения к нему.

После внесенных изменения наш Dockerfile превратится в следующий:

Entrypoint-скрипт будет выглядеть так:

Настройки для filebeat в нашем случае будут (специально опубликую их отдельно):

А манифест для Kubernetes кластера, вместе со всеми необходимыми объектами (сервис/деплоймент/секрет/измененный конфигмап) станет таким:

Elastic-пайплайн выглядит так:

Данный пайплайн добавляется с помощью следующей команды:

curl -k -s -XPUT -u ${ELASTICSEARCH_USERNAME}:${ELASTICSEARCH_PASSWORD} -H "Content-Type: application/json" -d @airflow-pipeline.json "https://${ELASTICSEARCH_URLS}:9200/_ingest/pipeline/airflow-filebeat-pipeline"

После применения предложенной конфигурации логи запускаемых DAG’ов будут посредством filebeat отправляться в Elasticsearch, а благодаря новым секциям

    [elasticsearch]
    host = https://elasticUser:elasticPaasword@elasticHost:9200
    log_id_template = {dag_id}|{task_id}|{execution_date}|{try_number}
    end_of_log_mark = end_of_log
    frontend =
    write_stdout = True
    json_format = True
    json_fields = asctime, filename, lineno, levelname, message

    [elasticsearch_configs]
    use_ssl = True
    verify_certs = True
    ca_certs = /es/certs/ca.pem

указанным в конфигурационном файле airflow.cfg эти самые логи можно будет увидеть и в UI самого Airflow.

На этом все, свои способы отправки логов из Airflow в Elasticsearch оставляйте в комментариях!

tweet Share