

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Pulizia del database Aurora PostgreSQL in un ambiente Amazon MWAA
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow utilizza un database Aurora PostgreSQL come database di metadati Apache Airflow, in cui vengono eseguite le operazioni DAG e vengono archiviate le istanze delle attività. Il seguente codice di esempio cancella periodicamente le voci dal database Aurora PostgreSQL dedicato per il tuo ambiente Amazon MWAA.

**Topics**
+ [Versione](#samples-database-cleanup-version)
+ [Prerequisiti](#samples-database-cleanup-prereqs)
+ [Dipendenze](#samples-sql-server-dependencies)
+ [Esempio di codice](#samples-database-cleanup-code)

## Versione
<a name="samples-database-cleanup-version"></a>

Gli esempi di codice in questa pagina sono specifici di Apache Airflow v2 e v3 supportato su Amazon MWAA. Fai riferimento alle versioni di [Apache Airflow supportate](airflow-versions.md).

## Prerequisiti
<a name="samples-database-cleanup-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Dipendenze
<a name="samples-sql-server-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2, non sono necessarie dipendenze aggiuntive. Usa [aws-mwaa-docker-images per installare Apache Airflow](https://github.com/aws/amazon-mwaa-docker-images).

## Esempio di codice
<a name="samples-database-cleanup-code"></a>

Il seguente DAG pulisce il database dei metadati per le tabelle specificate in. `TABLES_TO_CLEAN` L'esempio elimina i dati dalle tabelle specificate che risalgono a più di 30 giorni. Per modificare la data di eliminazione delle voci, `MAX_AGE_IN_DAYS` impostate un valore diverso.

------
#### [ Apache Airflow v3.0.6 to 3.2.1 ]

```
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean

# A value of None will clean all tables
TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag",
    schedule=None,
    catchup=False,
    start_date=datetime(2026, 1, 1),
) as dag:
    tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else ""

    bash_command = (
        f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null "
        f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && "
        "echo \"Cleaning records before: $TIMESTAMP\" && "
        "airflow db clean "
        "--clean-before-timestamp \"$TIMESTAMP\" "
        f"{tables_flag}"
        "--skip-archive --yes"
    )

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command,
    )
```

------
#### [ Apache Airflow v2.7.2 to 2.11.0 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------