View a markdown version of this page

Pembersihan basis data Aurora PostgreSQL di lingkungan Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Pembersihan basis data Aurora PostgreSQL di lingkungan Amazon MWAA

Alur Kerja Terkelola Amazon untuk Apache Airflow menggunakan database Aurora PostgreSQL sebagai database metadata Apache Airflow, tempat DAG berjalan dan instance tugas disimpan. Kode contoh berikut secara berkala menghapus entri dari database Aurora PostgreSQL khusus untuk lingkungan Amazon MWAA Anda.

Versi

Contoh kode di halaman ini khusus untuk Apache Airflow v2 dan v3 yang didukung di Amazon MWAA. Lihat versi Apache Airflow yang didukung.

Prasyarat

Untuk menggunakan kode sampel di halaman ini, Anda memerlukan yang berikut:

Dependensi

Untuk menggunakan contoh kode ini dengan Apache Airflow v2, tidak diperlukan dependensi tambahan. Gunakan aws-mwaa-docker-images untuk menginstal Apache Airflow.

Contoh kode

DAG berikut membersihkan database metadata untuk tabel yang ditentukan dalam. TABLES_TO_CLEAN Contoh menghapus data dari tabel yang ditentukan yang lebih tua dari 30 hari. Untuk menyesuaikan seberapa jauh entri dihapus, atur MAX_AGE_IN_DAYS ke nilai yang berbeda.

Apache Airflow v3.0.6 to 3.2.1
from datetime import datetime from airflow import DAG from airflow.providers.standard.operators.bash import BashOperator # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule=None, catchup=False, start_date=datetime(2026, 1, 1), ) as dag: tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else "" bash_command = ( f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null " f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && " "echo \"Cleaning records before: $TIMESTAMP\" && " "airflow db clean " "--clean-before-timestamp \"$TIMESTAMP\" " f"{tables_flag}" "--skip-archive --yes" ) cli_command = BashOperator( task_id="bash_command", bash_command=bash_command, )
Apache Airflow v2.7.2 to 2.11.0
from airflow import DAG from airflow.models.param import Param from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule_interval=None, catchup=False, start_date=days_ago(1), params={ "timestamp": Param( default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"), type="string", minLength=1, maxLength=255, ), } ) as dag: if TABLES_TO_CLEAN: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes" else: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes" cli_command = BashOperator( task_id="bash_command", bash_command=bash_command )