View a markdown version of this page

Introdução aos fluxos de CDC - Amazon Aurora DSQL

Introdução aos fluxos de CDC

Importante

Esse recurso é fornecido como visualização prévia da AWS e está sujeito a alterações. Para obter mais informações, consulte a seção 2, Versões beta e visualizações prévias, nos Termos de serviço da AWS. Para saber mais sobre precificação para fluxos de CDC, consulte a página de precificação do Aurora DSQL.

Antes da disponibilidade geral, adicionaremos novos tipos de operação ("op": "u" para atualizações) à carga útil do fluxo. Para garantir que seu aplicativo processe essas alterações sem modificações, trate qualquer valor op não reconhecido como um acréscimo aplicando a carga útil after. Para mais detalhes, consulte Noções básicas sobre os registros de CDC.

Este guia mostra todas as etapas necessárias para começar a transmitir alterações confirmadas no nível da linha de um cluster do Aurora DSQL para um fluxo de dados do Amazon Kinesis. Ao final deste guia, você terá criado um pipeline de CDC funcional e um script Python que lê e imprime registros de alterações.

Pré-requisitos

Antes de começar, confirme o seguinte:

  • Você criou um cluster do Aurora DSQL no status ACTIVE. Se o seu cluster estiver ocioso, conecte-se a ele com qualquer cliente compatível com PostgreSQL para ativá-lo antes de criar um fluxo de CDC. CreateStream retornará um erro de validação se o cluster não estiver no status ACTIVE.

  • O Aurora DSQL exige que todos os recursos de CDC (o cluster, o fluxo de dados do Amazon Kinesis, o perfil de serviço do IAM e a entidade principal responsável pela chamada) estejam na mesma conta da AWS.

  • O fluxo de dados do Amazon Kinesis está na mesma região da AWS do cluster do Aurora DSQL.

  • Você instalou e configurou a AWS CLI com credenciais que têm permissão para criar perfis do IAM e fluxos de dados do Amazon Kinesis.

Etapa 1: criar um fluxo de dados do Amazon Kinesis

Crie um fluxo de dados do Kinesis na mesma conta e região da AWS do cluster do Aurora DSQL. Os registros de CDC são maiores do que a linha correspondente do Aurora DSQL porque o formato JSON inclui nomes de colunas, metadados e sobrecarga de codificação.

Dimensionamento do fluxo de dados do Kinesis

A CDC do Aurora DSQL fornece a linha completa de cada alteração. Uma atualização referente a uma única coluna produz um registro que contém todas as colunas da linha. Excluir registros é a exceção. Eles incluem somente as colunas da chave primária.

Estimar o tamanho médio do registro

Meça o tamanho médio da linha em disco para entender o volume que a CDC produzirá e para prever registros grandes. A consulta a seguir retorna o tamanho médio da tupla em bytes para uma tabela:

SELECT avg(pg_column_size(t.*)) FROM your_table t;

O envelope do registro de CDC adiciona nomes de colunas, metadados e sobrecarga de codificação sobre o tamanho da linha. Para obter o formato exato do registro, consulte Carga útil de registros. Para saber como o Aurora DSQL lida com registros que excedem o limite de tamanho de registro do Kinesis, consulte Processamento de registros de tamanho grande. Para ver o conjunto completo dos limites do serviço do Kinesis, consulte Cotas e limites do Amazon Kinesis Data Streams no Guia do desenvolvedor do Amazon Kinesis Data Streams.

Importante

Ao criar o fluxo de dados do Kinesis, defina o seguinte:

  • MaxRecordSizeInKiB como 10240 (10 MiB). O máximo padrão do Kinesis de 1 MiB nem sempre é grande o suficiente para registros de CDC do Aurora DSQL. Qualquer registro que exceda o tamanho configurado do Kinesis faz com que o fluxo de CDC fique comprometido com KINESIS_OVERSIZE_RECORD. O Aurora DSQL divide registros grandes em fragmentos que podem se aproximar de 10 MiB cada, portanto, o fluxo de dados do Kinesis precisa aceitar registros desse tamanho. Para obter detalhes, consulte Processamento de registros de tamanho grande.

  • StreamMode para ON_DEMAND. O modo sob demanda dimensiona automaticamente a capacidade do fragmento e protege você contra o subprovisionamento durante picos inesperados. O Kinesis ainda pode retornar WriteProvisionedThroughputExceeded durante rajadas nítidas na escala de segundos à medida que a capacidade aumenta. Planeje eventos breves de controle de utilização.

Crie alarmes do CloudWatch em IncomingBytes e WriteProvisionedThroughputExceeded no namespace AWS/Kinesis. O controle de utilização do Kinesis retarda a entrega de CDC e aumenta o atraso na replicação. Para obter métricas do lado do Aurora DSQL e orientações sobre alarmes, consulte Práticas recomendadas de monitoramento.

O exemplo a seguir usa a AWS CLI. Se a sua versão da AWS CLI não aceitar o parâmetro --max-record-size-in-ki-b, use um SDK da AWS para chamar a operação CreateStream do Kinesis.

aws kinesis create-stream \ --stream-name my-cdc-stream \ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --region region

Aguarde o fluxo ficar ativo:

aws kinesis describe-stream-summary \ --stream-name my-cdc-stream \ --region region \ --query 'StreamDescriptionSummary.StreamStatus'

O comando retorna "ACTIVE" quando o fluxo está pronto.

Grave o ARN do fluxo na saída. Você precisará dele nas etapas a seguir. O ARN tem o formato arn:aws:kinesis:region:account-id:stream/my-cdc-stream.

Etapa 2: criar um perfil do IAM para o Aurora DSQL

O Aurora DSQL assume o perfil do IAM para gravar registros de CDC em seu fluxo de dados do Kinesis. Nesta etapa, você criará o perfil com uma política de confiança e anexará uma política de permissões. Para obter uma explicação completa sobre cada elemento da política, consulte Configuração do IAM.

Criar o arquivo de política de confiança

Salve o JSON a seguir como trust-policy.json. Substitua your-account-id, region e cluster-id por seus valores.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
Criar o perfil

Execute o seguinte comando da para criar o perfil do IAM:

aws iam create-role \ --role-name dsql-cdc-role \ --assume-role-policy-document file://trust-policy.json
Criar o arquivo de política de permissões

Salve o JSON a seguir como permissions-policy.json. Substitua os valores de espaço reservado por seu ARN de fluxo de dados do Kinesis. A instrução KMSAccess só será necessária se seu fluxo de dados do Kinesis usar uma chave gerenciada pelo cliente do AWS KMS, mas você pode incluí-la preventivamente para que a adição de uma chave gerenciada pelo cliente não interrompa seu fluxo de CDC posteriormente. Para obter uma explicação completa sobre cada condição, consulte Política de permissões do perfil de serviço.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
Anexar a política de permissões

Execute o seguinte comando:

aws iam put-role-policy \ --role-name dsql-cdc-role \ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json

Grave o ARN do perfil na saída create-role. O ARN tem o formato arn:aws:iam::your-account-id:role/dsql-cdc-role.

Etapa 3: criar o fluxo de CDC

Use a AWS CLI para criar um fluxo de CDC que conecte seu cluster do Aurora DSQL ao fluxo de dados do Kinesis. Substitua os valores do espaço reservado pelo ARN do fluxo do Kinesis da Etapa 1, pelo ARN do perfil do IAM da Etapa 2 e por seu identificador de cluster.

aws dsql create-stream \ --cluster-identifier cluster-id \ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --region region

A resposta inclui um identificador de fluxo e um status CREATING. A criação do fluxo normalmente leva de um a três minutos.

Aguardar o fluxo ficar ativo

Pesquise o status do fluxo até que ele fique ACTIVE:

aws dsql get-stream \ --cluster-identifier cluster-id \ --stream-identifier stream-id \ --region region \ --query 'status'

Você também pode usar o waiter StreamActive nos SDKs da AWS para fazer pesquisas automaticamente.

Depois que o fluxo fica ACTIVE, o Aurora DSQL começa a entregar alterações confirmadas no nível da linha em seu fluxo de dados do Kinesis.

nota

Cada cluster do Aurora DSQL tem um número máximo de fluxos de CDC. Se você atingir esse limite, CreateStream retornará ServiceQuotaExceededException. Para o limite padrão, consulte Cotas e limites.

Etapa 4: verificar se os registros estão fluindo

Insira uma linha em uma tabela no seu cluster do Aurora DSQL. Por exemplo:

CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');

Leia o fluxo de dados do Kinesis para verificar se o registro de CDC chegou:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-name my-cdc-stream \ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --region region \ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --region region

O campo Data de cada registro contém uma carga útil JSON. Quando você usa a AWS CLI, a carga útil é codificada em base64 na resposta. Quando você usa o SDK boto3, o SDK o decodifica automaticamente. O JSON decodificado é semelhante ao seguinte:

{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }

Para ver uma descrição completa de cada campo, consulte Noções básicas sobre os registros de CDC.

Etapa 5: consumir registros com um script Python

O script Python a seguir lê registros de CDC de um fluxo de dados do Kinesis e imprime cada evento de alteração. O script usa o cliente boto3 do Amazon Kinesis para iterar sobre fragmentos e decodificar cada registro. Como a CDC do Aurora DSQL usa entrega pelo menos uma vez, o script pode imprimir o mesmo registro mais de uma vez.

""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)

Execute o script :

pip install boto3 python consume_cdc.py \ --stream-name my-cdc-stream \ --region region

O script imprime cada evento de alteração à medida que ele chega. Você verá uma saída semelhante à seguinte:

Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
Adição da desduplicação de last-writer-wins

Como a CDC do Aurora DSQL usa entrega pelo menos uma vez, os aplicativos de produção devem desduplicar e classificar registros. O exemplo de código a seguir mostra uma abordagem de marca d'água alta: para cada chave primária, ela rastreia o source.ts_ns mais alto visto até agora e descarta qualquer registro com um carimbo de data e hora igual ou anterior. Defina PK_COLUMNS como os nomes das colunas de chave primária da tabela que você está processando. Para estratégias que lidam com várias tabelas ou exclusões, consulte Estratégias do consumidor.

# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True

Gerenciamento de fluxos de CDC

Listagem de fluxos

Para listar todos os fluxos de CDC de um cluster, use a operação ListStreams:

aws dsql list-streams \ --cluster-identifier cluster-id \ --region region
Exclusão de um fluxo

Para excluir um fluxo deCDC, execute o seguinte comando:

aws dsql delete-stream \ --cluster-identifier cluster-id \ --stream-identifier stream-id \ --region region

Você pode usar o waiter StreamNotExists para pesquisar GetStream até que uma ResourceNotFoundException seja retornada, indicando que o Aurora DSQL excluiu totalmente o fluxo.