Apache Airflow: запуск Kubernetes Pod Operator через API
May 22, 2021 06:33 · 541 words · 3 minute read
Ранее мы уже упоминали об Apache Airflow
- инструменте для разработки, планирования и мониторинга batch-процессов обработки данных. В данной статье рассмотрим запуск Airflow-задач (Directed Acyclic Graph, далее DAG) и передачу параметров с помощью Airflow REST API - давайте разберемся!
Среди множества всевозможных операторов в Apache Airflow
есть оператор под названием KubernetesPodOperator
, который используется для запуска задач в подах кластера Kubernetes
. Это позволяет нам запускать распределенные вычисления или длительные операции (терминология Google AIP - long-running operations) в кластере и практически не заботясь о их масштабировании и расположении - Kubernetes
сделает это за нас. Такой подход позволяет существенно сэкономить средства, ведь после выполнения задачи под может быть удален, а кластер, при правильно настроенном масштабировании, уменьшен в размерах.
Некоторые параметры при настройке KubernetesPodOperator
могут использоваться как шаблоны (полный список), другие же могут сравнительно легко быть “шаблонизированы” (пример для параметра ’namespace’ рассмотрим ниже) - именно эту возможность мы будем использовать для демонстрации запуска и передачи параметров в DAG с помощью HTTP-запроса.
Простейший пример, в котором будет использоваться в качестве шаблона параметр ‘image’, выглядит следующим образом:
Запустить DAG через HTTP и передать в качестве параметра тег образа можно с помощью следующего запроса (также приведен ответ от API):
curl -X POST \
http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
-H 'Content-Type: application/json' \
-H 'Cache-Control: no-cache' \
-d '{"conf":{"image_tag":"15"}}'
{
"conf": {
"image_tag": "15"
},
"dag_id": "000_templated_task",
"dag_run_id": "manual__2021-05-22T07:52:17.440555+00:00",
"end_date": null,
"execution_date": "2021-05-22T07:52:17.440555+00:00",
"external_trigger": true,
"start_date": "2021-05-22T07:52:17.443583+00:00",
"state": "running"
}
после выполнения задачи в логе можно будет увидеть:
openjdk 15.0.2 2021-01-19
OpenJDK Runtime Environment (build 15.0.2+7-27)
OpenJDK 64-Bit Server VM (build 15.0.2+7-27, mixed mode, sharing)
Запустим тот же DAG, но с другим значением параметра (ответ от API о запущенной задаче опущен):
curl -X POST \
http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
-H 'Content-Type: application/json' \
-H 'Cache-Control: no-cache' \
-d '{"conf":{"image_tag":"16"}}'
лог:
openjdk 16.0.1 2021-04-20
OpenJDK Runtime Environment (build 16.0.1+9-24)
OpenJDK 64-Bit Server VM (build 16.0.1+9-24, mixed mode, sharing)
Естесственно, Apache Airflow
поддерживает несколько вариантов аутентификации и даже позволяет внедрять свои варианты. В следующем примере рассмотрим самый простой из предложенных - ‘basic_auth’. Проверить какой именно вариант используется на вашем инстансе Airflow
можно с помощью команды:
$ airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth
С включенной ‘basic_auth’ запрос должен выглядеть так:
curl -X POST \
--user "admin:admin" \
http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
-H 'Content-Type: application/json' \
-H 'Cache-Control: no-cache' \
-d '{"conf":{"image_tag":"16"}}'
или так:
curl -X POST \
http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \
-H "Authorization: Basic YWRtaW46YWRtaW4=" \
-H 'Content-Type: application/json' \
-H 'Cache-Control: no-cache' \
-d '{"conf":{"image_tag":"16"}}'
Более интересный пример из реальной жизни, в котором параметры используются для передачи переменых окружения внутрь запускаемого пода, выглядит так:
Запускаем его такой командой:
curl -X POST \
http://localhost:8080/api/v1/dags/manual_tracer/dagRuns \
-H "Authorization: Basic YWRtaW46YWRtaW4=" \
-H 'Content-Type: application/json' \
-H 'Cache-Control: no-cache' \
-d '{"conf":{"count":"10", "url":"http://ads-api-rtb.default.svc.cluster.local:8080/services/trace", "json":"{\"factor\":4,\"ssp\":\"inneractive\",\"duration\":10}"}}'
{
"conf": {
"count": "10",
"json": "{\"factor\":4,\"ssp\":\"inneractive\",\"duration\":10}",
"url": "http://ads-api-rtb.default.svc.cluster.local:8080/services/trace"
},
"dag_id": "manual_tracer",
"dag_run_id": "manual__2021-05-22T07:51:16.727115+00:00",
"end_date": null,
"execution_date": "2021-05-22T07:51:16.727115+00:00",
"external_trigger": true,
"start_date": "2021-05-22T07:51:16.730199+00:00",
"state": "running"
}
Обещанный пример с “шаблонизацией” параметра ’namespace’ может выглядеть примерно так:
...
# Create my own operator with the same behavior that adds namespace to template_fields
class MyKubernetesPodOperator(KubernetesPodOperator):
template_fields = KubernetesPodOperator.template_fields +('namespace',)
...
# Use code as
with DAG('test_ns', default_args=default_args, schedule_interval='@once') as dag:
ns = """ {{ dag_run.conf.ns }} """
example_task = MyKubernetesPodOperator(namespace=ns,...)
...
На этом все, в следующей статье рассмотрим настройку SLA (Service Level Agreement) для задач в Apache Airflow
и получение соответствующих уведомлений.