开始使用 CDC 流
重要
此功能以 AWS 预览版形式提供,可能会发生变化。有关更多信息,请参阅 AWS Service Terms
在正式发布之前,我们将向流有效载荷添加新的操作类型("op": "u" 表示更新)。要确保您的应用程序无需修改即可处理这些更改,请通过应用 after 有效载荷将任何无法识别的 op 值视为更新插入。有关详细信息,请参阅了解 CDC 记录。
本指南将引导您完成开始将已提交的行级更改从 Aurora DSQL 集群流式传输到 Amazon Kinesis 数据流所需的每个步骤。按照本指南完成所有步骤后,您便创建了一个可正常运行的 CDC 管道和一个用于读取和输出更改记录的 Python 脚本。
先决条件
开始之前,请确认以下几点:
-
您已创建一个具有
ACTIVE状态的 Aurora DSQL 集群。如果您的集群处于空闲状态,请先使用任何兼容 PostgreSQL 的客户端连接到该集群以将其唤醒,然后再创建 CDC 流。如果集群未处于ACTIVE状态,CreateStream将返回验证错误。 -
Aurora DSQL 要求所有 CDC 资源(集群、Amazon Kinesis 数据流、IAM 服务角色和调用主体)都位于同一 AWS 账户中。
-
您的 Amazon Kinesis 数据流与您的 Aurora DSQL 集群位于同一 AWS 区域。
-
您已安装并配置 AWS CLI,且所使用的凭证拥有创建 IAM 角色和 Amazon Kinesis 数据流的权限。
步骤 1:创建 Amazon Kinesis 数据流
在与您的 Aurora DSQL 集群相同的 AWS 账户和区域中创建 Kinesis 数据流。CDC 记录比对应的 Aurora DSQL 行大,因为 JSON 格式包括列名、元数据和编码开销。
调整 Kinesis 数据流的大小
Aurora DSQL CDC 会在每次更改时传输整行数据。即使更新只涉及单个列,生成的记录也包含该行中的所有列。删除记录属于例外情况,它们只包含主键列。
估算平均记录大小
测量平均磁盘行大小以了解 CDC 将产生的数据量,并预测超大记录。以下查询将返回表的平均元组大小(以字节为单位):
SELECT avg(pg_column_size(t.*)) FROMyour_tablet;
CDC 记录信封在行大小的基础上,额外增加了列名、元数据和编码开销。有关确切的记录格式,请参阅记录有效载荷。有关 Aurora DSQL 如何处理超过 Kinesis 记录大小限制的记录,请参阅处理超大记录。有关完整的 Kinesis 服务限制集,请参阅《Amazon Kinesis Data Streams 开发人员指南》中的 Amazon Kinesis Data Streams 配额和限制。
重要
在创建 Kinesis 数据流时,请进行以下设置:
-
MaxRecordSizeInKiB设置为10240(10 MiB)。Kinesis 默认的 1 MiB 最大记录大小对于 Aurora DSQL CDC 记录来说并不总是足够大。任何超过配置的 Kinesis 记录大小的记录都会导致 CDC 流受损,并返回KINESIS_OVERSIZE_RECORD。Aurora DSQL 会将超大记录拆分为多个片段,每个片段可接近 10 MiB,因此 Kinesis 数据流需要接受该大小的记录。有关更多信息,请参阅处理超大记录。 -
StreamMode设置为ON_DEMAND。按需模式会自动扩展分片容量,避免您在意外流量高峰期间遭遇容量预调配不足的情况。在容量纵向扩展期间,如果遇到秒级突发流量峰值,Kinesis 仍可能返回WriteProvisionedThroughputExceeded。制定计划,应对短暂的节流事件。
在 AWS/Kinesis 命名空间中针对 IncomingBytes 和 WriteProvisionedThroughputExceeded 创建 CloudWatch 警报。Kinesis 节流会减慢 CDC 传输速度并增加复制延迟。有关 Aurora DSQL 端的指标和警报指南,请参阅监控最佳实践。
下面的示例使用了 AWS CLI。如果您的 AWS CLI 版本不支持 --max-record-size-in-ki-b 参数,请使用 AWS SDK 调用 Kinesis CreateStream 操作。
aws kinesis create-stream \ --stream-namemy-cdc-stream\ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --regionregion
等待流变为活动状态:
aws kinesis describe-stream-summary \ --stream-namemy-cdc-stream\ --regionregion\ --query 'StreamDescriptionSummary.StreamStatus'
该命令将在流就绪后返回 "ACTIVE"。
记录输出中的流 ARN。您在后续步骤中需要使用它。此 ARN 的格式为:arn:aws:kinesis:。region:account-id:stream/my-cdc-stream
步骤 2:为 Aurora DSQL 创建 IAM 角色
Aurora DSQL 会代入 IAM 角色,以将 CDC 记录写入您的 Kinesis 数据流。在此步骤中,您将创建具有信任策略的角色并附加权限策略。有关每个策略元素的详细说明,请参阅配置 IAM。
创建信任策略文件
将以下 JSON 保存为 trust-policy.json。将 your-account-id、region 和 cluster-id 替换为您的值。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
创建角色
运行以下命令来创建 IAM 角色:
aws iam create-role \ --role-namedsql-cdc-role\ --assume-role-policy-document file://trust-policy.json
创建权限策略文件
将以下 JSON 保存为 permissions-policy.json。将占位符值替换为您的 Kinesis 数据流 ARN。仅在您的 Kinesis 数据流使用 AWS KMS 客户自主管理型密钥时,才需要 KMSAccess 语句,但您可以提前包含该语句,这样后续添加客户自主管理型密钥时就不会导致 CDC 流中断。有关每个条件的完整说明,请参阅服务角色权限策略。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
附加权限策略
运行以下命令:
aws iam put-role-policy \ --role-namedsql-cdc-role\ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json
记录 create-role 输出中的角色 ARN。此 ARN 的格式为:arn:aws:iam::。your-account-id:role/dsql-cdc-role
步骤 3:创建 CDC 流
使用 AWS CLI 创建 CDC 流,该流可将您的 Aurora DSQL 集群连接到 Kinesis 数据流。将占位符值替换为步骤 1 中的 Kinesis 流 ARN、步骤 2 中的 IAM 角色 ARN 以及您的集群标识符。
aws dsql create-stream \ --cluster-identifiercluster-id\ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --regionregion
响应包含流标识符和 CREATING 状态。流创建过程通常需要一到三分钟。
等待流变为活动状态
轮询流状态,直至它达到 ACTIVE:
aws dsql get-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion\ --query 'status'
您也可以使用 AWS SDK 中的 StreamActive 等待器进行自动轮询。
在流达到 ACTIVE 状态后,Aurora DSQL 会开始向 Kinesis 数据流传输已提交的行级更改。
注意
每个 Aurora DSQL 集群都有 CDC 流上限。如果您达到此限制,CreateStream 将返回 ServiceQuotaExceededException。有关默认限制,请参阅配额和限制。
步骤 4:验证记录是否正常传输
在 Aurora DSQL 集群的表中插入一行。例如:
CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');
从 Kinesis 数据流中读取,以验证 CDC 记录是否已到达:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-namemy-cdc-stream\ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --regionregion\ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --regionregion
每条记录的 Data 字段均包含一个 JSON 有效载荷。在使用 AWS CLI 时,响应中的有效载荷采用 Base64 编码。在使用 boto3 SDK 时,SDK 会自动对其进行解码。解码后的 JSON 类似以下内容:
{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }
有关每个字段的完整描述,请参阅了解 CDC 记录。
步骤 5:通过 Python 脚本使用记录
以下 Python 脚本从 Kinesis 数据流读取 CDC 记录,并输出每个更改事件。该脚本使用 boto3 Amazon Kinesis 客户端来遍历分片并解码每条记录。由于 Aurora DSQL CDC 使用至少一次传输机制,该脚本可能会多次输出同一条记录。
""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)
运行脚本:
pip install boto3 python consume_cdc.py \ --stream-namemy-cdc-stream\ --regionregion
该脚本会在每个更改事件到达时输出更改事件。您将看到类似以下内容的输出:
Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
添加末次写入优先重复数据删除机制
由于 Aurora DSQL CDC 使用至少一次传输机制,因此生产应用程序将删除重复数据并对记录排序。以下代码示例显示了一种高水位线方法:对于每个主键,它会跟踪迄今为止看到的最大 source.ts_ns,并丢弃所有时间戳相等或更早的记录。将 PK_COLUMNS 设置为您正在处理的表的主键列名。有关处理多个表或删除操作的策略,请参阅使用者策略。
# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True
管理 CDC 流
列出流
要列出集群的所有 CDC 流,请使用 ListStreams 操作:
aws dsql list-streams \ --cluster-identifiercluster-id\ --regionregion
删除流
要删除 CDC 流,请运行以下命令:
aws dsql delete-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion
您可以使用 StreamNotExists 等待器来轮询 GetStream,直到返回 ResourceNotFoundException,这表示 Aurora DSQL 已完全删除该流。