View a markdown version of this page

Apache Airflow CLI 命令参考 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Apache Airflow CLI 命令参考

本主题介绍了 Amazon MWAA 上支持和不支持的 Apache Airflow CLI 命令。

提示

REST API 比 CLI 更现代,专为与外部系统的编程集成而设计。REST 是与 Apache Airflow 交互的首选方式。

先决条件

下一节介绍了使用本页上的命令和脚本所需的初步步骤。

访问

AWS CLI

AWS Command Line Interface (AWS CLI) 是一个开源工具,您可以使用命令行 shell 中的命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:

更改了哪些内容?

  • v3:Airflow 架构。Apache Airflow v3 引入了突破性的架构更改,以提高安全性和可扩展性,并简化维护。要了解更多信息,请参阅升级到 Airflow 3

  • v2:Airflow CLI 命令结构。Apache Airflow v2 CLI 的组织方式是将相关命令分组为子命令,这意味着如果您想升级到 Apache Airflow v2,则需要更新 Apache Airflow v1 脚本。例如,Apache Airflow v1 中的 unpause 已更新为 Apache Airflow v2 中的 dags unpause。要了解更多信息,请参阅 2.0 中的 Airflow CLI 更改

支持的 CLI 命令

下一节列出了 Amazon MWAA 上可用的 Apache Airflow CLI 命令。

支持的 命令

Apache Airflow v3
次要版本 命令

v3.0.6、v3.2.1

资产详情

v3.0.6、v3.2.1

资产列表

v3.0.6、v3.2.1

资产实体化

v3.0.6、v3.2.1

回填创建

v3.0.6、v3.2.1

备忘单

v3.0.6、v3.2.1

添加连接

v3.0.6、v3.2.1

删除连接

v3.0.6、v3.2.1

删除 dags

v3.0.6、v3.2.1

dags 列表

v3.0.6、v3.2.1

dags 列表-任务

v3.0.6、v3.2.1

dags 列表导入错误

v3.0.6、v3.2.1

dags 列表-运行次数

v3.0.6、v3.2.1

dags 下次-执行

v3.0.6、v3.2.1

暂停 dag

v3.0.6、v3.2.1

报告 dags

v3.0.6、v3.2.1

重新序列化 dags

v3.0.6、v3.2.1

显示 dags

v3.0.6、v3.2.1

陈述 dags

v3.0.6、v3.2.1

测试 dags

v3.0.6、v3.2.1

触发 dags

v3.0.6、v3.2.1

取消暂停 dags

v3.0.6、v3.2.1

清理 db

v3.0.6、v3.2.1

提供商的行为

v3.0.6、v3.2.1

获得提供商

v3.0.6、v3.2.1

提供商挂钩

v3.0.6、v3.2.1

提供商链接

v3.0.6、v3.2.1

提供商列表

v3.0.6、v3.2.1

提供程序通知

v3.0.6、v3.2.1

提供商密钥

v3.0.6、v3.2.1

提供商触发器

v3.0.6、v3.2.1

提供商小部件

v3.0.6、v3.2.1

角色添加权限

v3.0.6、v3.2.1

角色删除权限

v3.0.6、v3.2.1

角色创建

v3.0.6、v3.2.1

列出角色

v3.0.6、v3.2.1

清除任务

v3.0.6、v3.2.1

任务失败-部署

v3.0.6、v3.2.1

列出任务

v3.0.6、v3.2.1

渲染任务

v3.0.6、v3.2.1

陈述任务

v3.0.6、v3.2.1

dag 运行的任务状态

v3.0.6、v3.2.1

测试任务

v3.0.6、v3.2.1

删除变量

v3.0.6、v3.2.1

获取变量

v3.0.6、v3.2.1

设置变量

v3.0.6、v3.2.1

列出变量

v3.0.6、v3.2.1

版本

Apache Airflow v2

使用解析 DAG 的命令

如果环境运行的是 Apache Airflow v2.0.2,则如果 DAG 使用的插件依赖于通过 requirements.txt 安装的程序包,则解析 DAG 的 CLI 命令将会失败:

Apache Airflow v2.0.2
  • dags backfill

  • dags list

  • dags list-runs

  • dags next-execution

如果 DAG 不使用依赖于通过 requirements.txt 安装的程序包的插件,则可以使用这些 CLI 命令。

代码示例

下一节包含使用 Apache Airflow CLI 的不同方法的示例。

设置、获取或删除 Apache Airflow v2 变量

您可以使用以下示例代码设置、获取或删除 <script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable> 格式的变量。

[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

触发 DAG 时添加配置

您可以在 Apache Airflow v2 中使用以下示例代码在触发 DAG 时添加配置,例如 airflow trigger_dag 'dag_name' —conf '{"key":"value"}'

import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)

在通往堡垒主机的 SSH 隧道上运行 CLI 命令。

根据以下示例使用连接到 Linux 堡垒主机的 SSH 隧道代理运行 Airflow CLI 命令。

使用 curl
  1. ssh -D 8080 -f -C -q -N YOUR_USER@YOUR_BASTION_HOST
  2. curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --header YOUR_HEADERS --data-raw YOUR_CLI_COMMAND