View a markdown version of this page

Säuberung der Aurora PostgreSQL-Datenbank in einer Amazon MWAA-Umgebung - Von Amazon verwaltete Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Säuberung der Aurora PostgreSQL-Datenbank in einer Amazon MWAA-Umgebung

Amazon Managed Workflows for Apache Airflow verwendet eine Aurora PostgreSQL-Datenbank als Apache Airflow-Metadatendatenbank, in der DAG-Ausführungen ausgeführt und Task-Instances gespeichert werden. Der folgende Beispielcode löscht regelmäßig Einträge aus der speziellen Aurora PostgreSQL-Datenbank für Ihre Amazon MWAA-Umgebung.

Version

Die Codebeispiele auf dieser Seite sind spezifisch für Apache Airflow v2 und v3, die auf Amazon MWAA unterstützt werden. Sehen Sie sich die unterstützten Apache Airflow Airflow-Versionen an.

Voraussetzungen

Um den Beispielcode auf dieser Seite verwenden zu können, benötigen Sie Folgendes:

Abhängigkeiten

Um dieses Codebeispiel mit Apache Airflow v2 zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Verwenden Sie aws-mwaa-docker-images, um Apache Airflow zu installieren.

Codebeispiel

Die folgende DAG bereinigt die Metadaten-Datenbank nach den in angegebenen Tabellen. TABLES_TO_CLEAN Das Beispiel löscht Daten aus den angegebenen Tabellen, die älter als 30 Tage sind. Um anzupassen, wie weit die Einträge gelöscht werden, legen Sie MAX_AGE_IN_DAYS einen anderen Wert fest.

Apache Airflow v3.0.6 to 3.2.1
from datetime import datetime from airflow import DAG from airflow.providers.standard.operators.bash import BashOperator # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule=None, catchup=False, start_date=datetime(2026, 1, 1), ) as dag: tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else "" bash_command = ( f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null " f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && " "echo \"Cleaning records before: $TIMESTAMP\" && " "airflow db clean " "--clean-before-timestamp \"$TIMESTAMP\" " f"{tables_flag}" "--skip-archive --yes" ) cli_command = BashOperator( task_id="bash_command", bash_command=bash_command, )
Apache Airflow v2.7.2 to 2.11.0
from airflow import DAG from airflow.models.param import Param from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule_interval=None, catchup=False, start_date=days_ago(1), params={ "timestamp": Param( default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"), type="string", minLength=1, maxLength=255, ), } ) as dag: if TABLES_TO_CLEAN: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes" else: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes" cli_command = BashOperator( task_id="bash_command", bash_command=bash_command )