Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Pulizia del database Aurora PostgreSQL in un ambiente Amazon MWAA
Amazon Managed Workflows for Apache Airflow utilizza un database Aurora PostgreSQL come database di metadati Apache Airflow, in cui vengono eseguite le operazioni DAG e vengono archiviate le istanze delle attività. Il seguente codice di esempio cancella periodicamente le voci dal database Aurora PostgreSQL dedicato per il tuo ambiente Amazon MWAA.
Versione
Gli esempi di codice in questa pagina sono specifici di Apache Airflow v2 e v3 supportato su Amazon MWAA. Fai riferimento alle versioni di Apache Airflow supportate.
Prerequisiti
Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
Dipendenze
Per utilizzare questo esempio di codice con Apache Airflow v2, non sono necessarie dipendenze aggiuntive. Usa aws-mwaa-docker-images per installare Apache Airflow.
Esempio di codice
Il seguente DAG pulisce il database dei metadati per le tabelle specificate in. TABLES_TO_CLEAN L'esempio elimina i dati dalle tabelle specificate che risalgono a più di 30 giorni. Per modificare la data di eliminazione delle voci, MAX_AGE_IN_DAYS impostate un valore diverso.
- Apache Airflow v3.0.6 to 3.2.1
-
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule=None,
catchup=False,
start_date=datetime(2026, 1, 1),
) as dag:
tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else ""
bash_command = (
f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null "
f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && "
"echo \"Cleaning records before: $TIMESTAMP\" && "
"airflow db clean "
"--clean-before-timestamp \"$TIMESTAMP\" "
f"{tables_flag}"
"--skip-archive --yes"
)
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command,
)
- Apache Airflow v2.7.2 to 2.11.0
-
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
params={
"timestamp": Param(
default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
type="string",
minLength=1,
maxLength=255,
),
}
) as dag:
if TABLES_TO_CLEAN:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
else:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command
)