As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Limpeza do banco de dados do Aurora PostgreSQL em um ambiente do Amazon MWAA
A solução Workflows gerenciados pela Amazon para Apache Airflow usa um banco de dados Aurora PostgreSQL como o banco de dados de metadados do Apache Airflow, onde o DAG é executado e as instâncias das tarefas são armazenadas. O código de exemplo a seguir limpa periodicamente as entradas do banco de dados Aurora PostgreSQL dedicado para seu ambiente Amazon MWAA.
Versão
Os exemplos de código nesta página são específicos para o Apache Airflow v2 e v3 com suporte no Amazon MWAA. Consulte as versões compatíveis do Apache Airflow.
Pré-requisitos
Para usar o código de amostra nesta página, você precisará do seguinte:
Dependências
Para usar esse exemplo de código com o Apache Airflow v2, nenhuma dependência adicional é necessária. Use aws-mwaa-docker-images para instalar o Apache Airflow.
Exemplo de código
O DAG a seguir limpa o banco de dados de metadados das tabelas especificadas em TABLES_TO_CLEAN. O exemplo exclui dados das tabelas especificadas com mais de 30 dias. Para ajustar até que ponto as entradas são excluídas, defina MAX_AGE_IN_DAYS para um outro valor.
- Apache Airflow v3.0.6 to 3.2.1
-
from datetime import datetime
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule=None,
catchup=False,
start_date=datetime(2026, 1, 1),
) as dag:
tables_flag = f"--tables '{TABLES_TO_CLEAN}' " if TABLES_TO_CLEAN else ""
bash_command = (
f"TIMESTAMP=$(date -u -d '{MAX_AGE_IN_DAYS} days ago' '+%Y-%m-%d %H:%M:%S' 2>/dev/null "
f"|| date -u -v-{MAX_AGE_IN_DAYS}d '+%Y-%m-%d %H:%M:%S') && "
"echo \"Cleaning records before: $TIMESTAMP\" && "
"airflow db clean "
"--clean-before-timestamp \"$TIMESTAMP\" "
f"{tables_flag}"
"--skip-archive --yes"
)
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command,
)
- Apache Airflow v2.7.2 to 2.11.0
-
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
params={
"timestamp": Param(
default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
type="string",
minLength=1,
maxLength=255,
),
}
) as dag:
if TABLES_TO_CLEAN:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
else:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command
)