View a markdown version of this page

Nettoyage de base de données Aurora PostgreSQL dans un environnement Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Nettoyage de base de données Aurora PostgreSQL dans un environnement Amazon MWAA

Amazon Managed Workflows for Apache Airflow utilise une base de données Aurora PostgreSQL comme base de données de métadonnées Apache Airflow, dans laquelle le DAG s'exécute et les instances de tâches sont stockées. L'exemple de code suivant efface régulièrement les entrées de la base de données Aurora PostgreSQL dédiée à votre environnement Amazon MWAA.

Version

Les exemples de code présentés sur cette page sont spécifiques à Apache Airflow v2 et v3 pris en charge sur Amazon MWAA. Reportez-vous aux versions d'Apache Airflow prises en charge.

Conditions préalables

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :

Dépendances

Pour utiliser cet exemple de code avec Apache Airflow v2, aucune dépendance supplémentaire n'est requise. Utilisez aws-mwaa-docker-images pour installer Apache Airflow.

Exemple de code

Le DAG suivant nettoie la base de données de métadonnées pour les tables spécifiées dansTABLES_TO_CLEAN. L'exemple supprime les données des tables spécifiées datant de plus de 30 jours. Pour ajuster la date à laquelle les entrées ont été supprimées, définissez MAX_AGE_IN_DAYS une valeur différente.

Apache Airflow v3.0.6 to 3.2.1
from datetime import datetime from airflow import DAG from airflow.providers.standard.operators.bash import BashOperator # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule=None, catchup=False, start_date=datetime(2026, 1, 1), ) as dag: tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else "" bash_command = ( f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null " f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && " "echo \"Cleaning records before: $TIMESTAMP\" && " "airflow db clean " "--clean-before-timestamp \"$TIMESTAMP\" " f"{tables_flag}" "--skip-archive --yes" ) cli_command = BashOperator( task_id="bash_command", bash_command=bash_command, )
Apache Airflow v2.7.2 to 2.11.0
from airflow import DAG from airflow.models.param import Param from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta # Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change # timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met. MAX_AGE_IN_DAYS = 30 # To clean specific tables, please provide a comma-separated list per # https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean # A value of None will clean all tables TABLES_TO_CLEAN = None with DAG( dag_id="clean_db_dag", schedule_interval=None, catchup=False, start_date=days_ago(1), params={ "timestamp": Param( default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"), type="string", minLength=1, maxLength=255, ), } ) as dag: if TABLES_TO_CLEAN: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes" else: bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes" cli_command = BashOperator( task_id="bash_command", bash_command=bash_command )