View a markdown version of this page

CDC ストリームの使用開始 - Amazon Aurora DSQL

CDC ストリームの使用開始

重要

この機能は AWS プレビューとして提供されており、変更される可能性があります。詳細については、「AWS のサービス条件」のセクション 2、「ベータ版とプレビュー」を参照してください。CDC ストリームの料金の詳細については、「Aurora DSQL の料金ページ」を参照してください。

一般提供する前に、ストリームペイロードに新しいオペレーションタイプ ("op": "u" 更新用) を追加します。アプリケーションがこれらの変更を修正せずに処理できるようにするには、after ペイロードを適用して、認識されない op 値をすべてアップサートとして扱います。詳細については、「CDC レコードについて」を参照してください。

このガイドでは、Aurora DSQL クラスターから Amazon Kinesis データストリームにコミットされた行レベルの変更をストリーミングするために必要なすべてのステップについて説明します。このガイドの最後には、動作する CDC パイプラインと、変更レコードの読み取りと出力を行う Python スクリプトを作成しているはずです。

前提条件

開始する前に、以下の点を確認してください。

  • ACTIVE ステータスの Aurora DSQL クラスターを作成しました。クラスターがアイドル状態の場合は、CDC ストリームを作成する前に、PostgreSQL 互換クライアントを使用してクラスターに接続し、起動します。CreateStream は、クラスターのステータスが ACTIVE でない場合は検証エラーを返します。

  • Aurora DSQL では、クラスター、Amazon Kinesis データストリーム、IAM サービスロール、呼び出し元プリンシパルなど、すべての CDC リソースが同じ AWS アカウントに存在する必要があります。

  • Amazon Kinesis データストリームは、Aurora DSQL クラスターと同じ AWS リージョンにあります。

  • IAM ロールと Amazon Kinesis Data Streams を作成するアクセス許可を持つ認証情報を使用して、AWS CLI をインストールして設定しました。

ステップ 1: Amazon Kinesis データストリームを作成する

Aurora DSQL クラスターと同じ AWS アカウントおよびリージョンに Kinesis データストリームを作成します。CDC レコードは、JSON 形式に列名、メタデータ、エンコーディングのオーバーヘッドが含まれるため、対応する Aurora DSQL 行よりもサイズが大きくなります。

Kinesis データストリームのサイズ設定

Aurora DSQL CDC は、すべての変更に対して完全な行を配信します。単一の列のみを変更する更新を行うと、その行のすべての列を含むレコードが生成されます。削除レコードは例外であり、プライマリキー列のみが含まれます。

平均レコードサイズの見積もり

CDC が生成するボリュームを把握し、サイズの大きいレコードを予測するために、ディスク上の平均行サイズを測定します。次のクエリは、テーブルのタプルの平均サイズ (バイト単位) を返します。

SELECT avg(pg_column_size(t.*)) FROM your_table t;

CDC レコードエンベロープは、行サイズに加えて、列名、メタデータ、エンコーディングのオーバーヘッドを追加します。正確なレコード形式については、「レコードのペイロード」を参照してください。Aurora DSQL が Kinesis のレコードサイズ制限を超えるレコードを処理する方法については、「サイズの大きいレコードの処理」を参照してください。Kinesis サービスの制限の詳細については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Amazon Kinesis Data Streams のクォータと制限」を参照してください。

重要

Kinesis データストリームを作成するときは、以下を設定します。

  • MaxRecordSizeInKiB10240 (10 MiB) に変更します。Kinesis のデフォルトの最大 1 MiB は、Aurora DSQL CDC レコードに十分な大きさとは限りません。設定された Kinesis レコードサイズを超えるレコードがあると、CDC ストリームが KINESIS_OVERSIZE_RECORD で損なわれます。Aurora DSQL は、サイズの大きいレコードをそれぞれ 10 MiB 近くになるフラグメントに分割するため、Kinesis データストリームはそのサイズのレコードを受け入れる必要があります。詳細については、「サイズの大きいレコードの処理」を参照してください。

  • StreamMode を へON_DEMAND オンデマンドモードでは、シャード容量が自動的にスケーリングされ、予期しない急増時のプロビジョニング不足を防ぎます。容量がスケールアップしても、秒単位の急激なバーストが発生した場合、Kinesis が WriteProvisionedThroughputExceeded を返す可能性があります。短いスロットリングイベントを想定して計画します。

AWS/Kinesis 名前空間の IncomingBytesWriteProvisionedThroughputExceeded に対して CloudWatch アラームを作成します。Kinesis のスロットリングは CDC の配信を遅らせ、レプリケーションの遅延を増加させます。Aurora DSQL 側のメトリクスとアラームガイダンスについては、「モニタリングのベストプラクティス」を参照してください。

次の例では AWS CLI を使用しています。お使いの AWS CLI のバージョンで --max-record-size-in-ki-b パラメータがサポートされていない場合は、AWS SDK を使用して Kinesis CreateStream オペレーションを呼び出します。

aws kinesis create-stream \ --stream-name my-cdc-stream \ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --region region

ストリームがアクティブになるまで待機します。

aws kinesis describe-stream-summary \ --stream-name my-cdc-stream \ --region region \ --query 'StreamDescriptionSummary.StreamStatus'

ストリームの準備ができると、コマンドは "ACTIVE" を返します。

出力からストリーム ARN を記録します。以降のステップで必要になります。ARN の形式は arn:aws:kinesis:region:account-id:stream/my-cdc-stream です。

ステップ 2: Aurora DSQL 用の IAM ロールを作成する

Aurora DSQL は、Kinesis データストリームに CDC レコードを書き込む IAM ロールを引き受けます。このステップでは、信頼ポリシーを持つロールを作成し、アクセス許可ポリシーをアタッチします。各ポリシー要素の詳細については、「IAM の設定」を参照してください。

信頼ポリシーファイルを作成する

次の JSON を trust-policy.json として保存します。your-account-idregioncluster-id をお客様の値に置き換えます。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
ロールを作成する

次の コマンドを実行して、IAM ロールを作成します。

aws iam create-role \ --role-name dsql-cdc-role \ --assume-role-policy-document file://trust-policy.json
アクセス許可ポリシーファイルを作成する

次の JSON を permissions-policy.json として保存します。プレースホルダー値を Kinesis データストリームの ARN に置き換えます。KMSAccess ステートメントは、Kinesis データストリームが AWS KMS カスタマーマネージドキーを使用する場合にのみ必要ですが、カスタマーマネージドキーを後で追加しても CDC ストリームが破損しないように、事前に含めることができます。各条件の詳細については、「サービスロールのアクセス許可ポリシー」を参照してください。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
アクセス許可ポリシーをアタッチする

次のコマンドを実行します。

aws iam put-role-policy \ --role-name dsql-cdc-role \ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json

create-role 出力からロール ARN を記録します。ARN の形式は arn:aws:iam::your-account-id:role/dsql-cdc-role です。

ステップ 3: CDC ストリームを作成する

AWS CLI を使用して、Aurora DSQL クラスターを Kinesis データストリームに接続する CDC ストリームを作成します。プレースホルダー値を、ステップ 1 の Kinesis ストリーム ARN、ステップ 2 の IAM ロール ARN、およびクラスター識別子に置き換えます。

aws dsql create-stream \ --cluster-identifier cluster-id \ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --region region

レスポンスには、ストリーム識別子とステータス CREATING が含まれます。ストリームの作成には通常 1~3 分かかります。

ストリームがアクティブになるまで待機する

ACTIVE になるまでストリームステータスをポーリングします。

aws dsql get-stream \ --cluster-identifier cluster-id \ --stream-identifier stream-id \ --region region \ --query 'status'

AWS SDK で StreamActive ウェーターを使用して自動的にポーリングすることもできます。

ストリームが ACTIVE になると、Aurora DSQL はコミットされた行レベルの変更を Kinesis データストリームに配信し始めます。

注記

各 Aurora DSQL クラスターには、CDC ストリームの最大数があります。この制限に達すると、CreateStreamServiceQuotaExceededException を返します。デフォルトの制限については、「クォータと制限」を参照してください。

ステップ 4: レコードが流れていることを確認する

Aurora DSQL クラスターのテーブルに行を挿入します。例えば、次のようになります。

CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');

Kinesis データストリームから読み取り、CDC レコードが到着したことを確認します。

SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-name my-cdc-stream \ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --region region \ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --region region

各レコードの Data フィールドには、JSON ペイロードが含まれています。AWS CLI を使用する場合、ペイロードはレスポンスで Base64 エンコードされます。boto3 SDK を使用すると、SDK が自動的にデコードします。デコードされた JSON は以下のようになります。

{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }

各フィールドの詳細な説明については、「CDC レコードについて」を参照してください。

ステップ 5: Python スクリプトを使用してレコードを消費する

次の Python スクリプトは、Kinesis データストリームから CDC レコードを読み取り、各変更イベントを出力します。このスクリプトは、boto3 Amazon Kinesis クライアントを使用してシャードを反復処理し、各レコードをデコードします。Aurora DSQL CDC は少なくとも 1 回の配信を使用するため、スクリプトは同じレコードを複数回出力することがあります。

""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)

スクリプトを実行します。

pip install boto3 python consume_cdc.py \ --stream-name my-cdc-stream \ --region region

スクリプトは、各変更イベントが到着するたびにそれを出力します。以下のような出力結果が表示されます。

Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
最後の書き込みを優先重複排除の追加

Aurora DSQL CDC は少なくとも 1 回の配信を使用するため、本番アプリケーションではレコードの重複排除と順序付けを行う必要があります。次のコード例はハイウォーターマークアプローチを示しています。プライマリキーごとに、これまでに検出された最大の source.ts_ns を追跡し、同じまたはそれ以前のタイムスタンプを持つレコードを破棄します。PK_COLUMNS には、処理するテーブルのプライマリキー列名を設定します。複数のテーブルや削除を処理する戦略については、「コンシューマー戦略」を参照してください。

# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True

CDC ストリームの管理

ストリームを一覧表示する

クラスターのすべての CDC ストリームを一覧表示するには、ListStreams オペレーションを使用します。

aws dsql list-streams \ --cluster-identifier cluster-id \ --region region
ストリームを削除する

CDC ストリームを削除するには、次のコマンドを実行します。

aws dsql delete-stream \ --cluster-identifier cluster-id \ --stream-identifier stream-id \ --region region

StreamNotExists ウェーターを使用して GetStream をポーリングし、Aurora DSQL がストリームを完全に削除したことを示す ResourceNotFoundException が返されるまで続けることができます。