Apache Airflow: запуск Kubernetes Pod Operator через API

May 22, 2021 06:33 · 541 words · 3 minute read airflow docker kubernetes

Ранее мы уже упоминали об Apache Airflow - инструменте для разработки, планирования и мониторинга batch-процессов обработки данных. В данной статье рассмотрим запуск Airflow-задач (Directed Acyclic Graph, далее DAG) и передачу параметров с помощью Airflow REST API - давайте разберемся!

Среди множества всевозможных операторов в Apache Airflow есть оператор под названием KubernetesPodOperator, который используется для запуска задач в подах кластера Kubernetes. Это позволяет нам запускать распределенные вычисления или длительные операции (терминология Google AIP - long-running operations) в кластере и практически не заботясь о их масштабировании и расположении - Kubernetes сделает это за нас. Такой подход позволяет существенно сэкономить средства, ведь после выполнения задачи под может быть удален, а кластер, при правильно настроенном масштабировании, уменьшен в размерах.

Некоторые параметры при настройке KubernetesPodOperator могут использоваться как шаблоны (полный список), другие же могут сравнительно легко быть “шаблонизированы” (пример для параметра ’namespace’ рассмотрим ниже) - именно эту возможность мы будем использовать для демонстрации запуска и передачи параметров в DAG с помощью HTTP-запроса.

Простейший пример, в котором будет использоваться в качестве шаблона параметр ‘image’, выглядит следующим образом:

Запустить DAG через HTTP и передать в качестве параметра тег образа можно с помощью следующего запроса (также приведен ответ от API):

curl -X POST \
	http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
	-H 'Content-Type: application/json' \
	-H 'Cache-Control: no-cache' \
	-d '{"conf":{"image_tag":"15"}}'
{
  "conf": {
    "image_tag": "15"
  },
  "dag_id": "000_templated_task",
  "dag_run_id": "manual__2021-05-22T07:52:17.440555+00:00",
  "end_date": null,
  "execution_date": "2021-05-22T07:52:17.440555+00:00",
  "external_trigger": true,
  "start_date": "2021-05-22T07:52:17.443583+00:00",
  "state": "running"
}

после выполнения задачи в логе можно будет увидеть:

openjdk 15.0.2 2021-01-19
OpenJDK Runtime Environment (build 15.0.2+7-27)
OpenJDK 64-Bit Server VM (build 15.0.2+7-27, mixed mode, sharing)

Запустим тот же DAG, но с другим значением параметра (ответ от API о запущенной задаче опущен):

curl -X POST \
	http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
	-H 'Content-Type: application/json' \
	-H 'Cache-Control: no-cache' \
	-d '{"conf":{"image_tag":"16"}}'

лог:

openjdk 16.0.1 2021-04-20
OpenJDK Runtime Environment (build 16.0.1+9-24)
OpenJDK 64-Bit Server VM (build 16.0.1+9-24, mixed mode, sharing)

Естесственно, Apache Airflow поддерживает несколько вариантов аутентификации и даже позволяет внедрять свои варианты. В следующем примере рассмотрим самый простой из предложенных - ‘basic_auth’. Проверить какой именно вариант используется на вашем инстансе Airflow можно с помощью команды:

$ airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth

С включенной ‘basic_auth’ запрос должен выглядеть так:

curl -X POST \
	--user "admin:admin" \
	http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
	-H 'Content-Type: application/json' \
	-H 'Cache-Control: no-cache' \
	-d '{"conf":{"image_tag":"16"}}'

или так:

curl -X POST \
	http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
	-H "Authorization: Basic YWRtaW46YWRtaW4=" \
	-H 'Content-Type: application/json' \
	-H 'Cache-Control: no-cache' \
	-d '{"conf":{"image_tag":"16"}}'

Более интересный пример из реальной жизни, в котором параметры используются для передачи переменых окружения внутрь запускаемого пода, выглядит так:

Запускаем его такой командой:

curl -X POST \
	http://localhost:8080/api/v1/dags/manual_tracer/dagRuns \
	-H "Authorization: Basic YWRtaW46YWRtaW4=" \
	-H 'Content-Type: application/json' \
	-H 'Cache-Control: no-cache' \
	-d '{"conf":{"count":"10", "url":"http://ads-api-rtb.default.svc.cluster.local:8080/services/trace", "json":"{\"factor\":4,\"ssp\":\"inneractive\",\"duration\":10}"}}'
{
  "conf": {
    "count": "10",
    "json": "{\"factor\":4,\"ssp\":\"inneractive\",\"duration\":10}",
    "url": "http://ads-api-rtb.default.svc.cluster.local:8080/services/trace"
  },
  "dag_id": "manual_tracer",
  "dag_run_id": "manual__2021-05-22T07:51:16.727115+00:00",
  "end_date": null,
  "execution_date": "2021-05-22T07:51:16.727115+00:00",
  "external_trigger": true,
  "start_date": "2021-05-22T07:51:16.730199+00:00",
  "state": "running"
}

Обещанный пример с “шаблонизацией” параметра ’namespace’ может выглядеть примерно так:

...
# Create my own operator with the same behavior that adds namespace to template_fields
class MyKubernetesPodOperator(KubernetesPodOperator):
         template_fields = KubernetesPodOperator.template_fields +('namespace',)
...
# Use code as
with DAG('test_ns', default_args=default_args, schedule_interval='@once') as dag:
    ns = """ {{ dag_run.conf.ns }} """
    example_task = MyKubernetesPodOperator(namespace=ns,...)
...

На этом все, в следующей статье рассмотрим настройку SLA (Service Level Agreement) для задач в Apache Airflow и получение соответствующих уведомлений.

tweet Share