Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Pembersihan basis data Aurora PostgreSQL di lingkungan Amazon MWAA
Alur Kerja Terkelola Amazon untuk Apache Airflow menggunakan database Aurora PostgreSQL sebagai database metadata Apache Airflow, tempat DAG berjalan dan instance tugas disimpan. Kode contoh berikut secara berkala menghapus entri dari database Aurora PostgreSQL khusus untuk lingkungan Amazon MWAA Anda.
Versi
Contoh kode di halaman ini khusus untuk Apache Airflow v2 dan v3 yang didukung di Amazon MWAA. Lihat versi Apache Airflow yang didukung.
Prasyarat
Untuk menggunakan kode sampel di halaman ini, Anda memerlukan yang berikut:
Dependensi
Untuk menggunakan contoh kode ini dengan Apache Airflow v2, tidak diperlukan dependensi tambahan. Gunakan aws-mwaa-docker-images untuk menginstal Apache Airflow.
Contoh kode
DAG berikut membersihkan database metadata untuk tabel yang ditentukan dalam. TABLES_TO_CLEAN Contoh menghapus data dari tabel yang ditentukan yang lebih tua dari 30 hari. Untuk menyesuaikan seberapa jauh entri dihapus, atur MAX_AGE_IN_DAYS ke nilai yang berbeda.
- Apache Airflow v3.0.6 to 3.2.1
-
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule=None,
catchup=False,
start_date=datetime(2026, 1, 1),
) as dag:
tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else ""
bash_command = (
f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null "
f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && "
"echo \"Cleaning records before: $TIMESTAMP\" && "
"airflow db clean "
"--clean-before-timestamp \"$TIMESTAMP\" "
f"{tables_flag}"
"--skip-archive --yes"
)
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command,
)
- Apache Airflow v2.7.2 to 2.11.0
-
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
params={
"timestamp": Param(
default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
type="string",
minLength=1,
maxLength=255,
),
}
) as dag:
if TABLES_TO_CLEAN:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
else:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command
)