本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Amazon MWAA 環境上的 Aurora PostgreSQL 資料庫清除
Amazon Managed Workflows for Apache Airflow 使用 Aurora PostgreSQL 資料庫做為 Apache Airflow 中繼資料資料庫,其中存放 DAG 執行和任務執行個體。下列範例程式碼會定期從 Amazon MWAA 環境的專用 Aurora PostgreSQL 資料庫清除項目。
版本
此頁面上的程式碼範例專屬於 Amazon MWAA 上支援的 Apache Airflow v2 和 v3。請參閱支援的 Apache Airflow 版本。
先決條件
若要使用此頁面上的範例程式碼,您需要下列項目:
相依性
若要搭配 Apache Airflow v2 使用此程式碼範例,不需要額外的相依性。使用 aws-mwaa-docker-images 來安裝 Apache Airflow。
範例程式碼
下列 DAG 會清除 中指定資料表的中繼資料資料庫TABLES_TO_CLEAN。此範例會從超過 30 天的指定資料表中刪除資料。若要調整項目的刪除時間,請將 MAX_AGE_IN_DAYS設定為不同的值。
- Apache Airflow v3.0.6 to 3.2.1
-
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule=None,
catchup=False,
start_date=datetime(2026, 1, 1),
) as dag:
tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else ""
bash_command = (
f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null "
f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && "
"echo \"Cleaning records before: $TIMESTAMP\" && "
"airflow db clean "
"--clean-before-timestamp \"$TIMESTAMP\" "
f"{tables_flag}"
"--skip-archive --yes"
)
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command,
)
- Apache Airflow v2.7.2 to 2.11.0
-
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
params={
"timestamp": Param(
default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
type="string",
minLength=1,
maxLength=255,
),
}
) as dag:
if TABLES_TO_CLEAN:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
else:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command
)