Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Memulai dengan aliran CDC
penting
Fitur ini disediakan sebagai AWS Pratinjau dan dapat berubah sewaktu-waktu. Untuk informasi selengkapnya, lihat bagian 2, Beta dan Pratinjau, di Ketentuan AWS Layanan
Sebelum ketersediaan umum, kami akan menambahkan jenis operasi baru ("op": "u"untuk pembaruan) ke payload streaming Anda. Untuk memastikan aplikasi Anda menangani perubahan ini tanpa modifikasi, perlakukan op nilai yang tidak dikenal sebagai peningkatan dengan menerapkan payload. after Lihat Memahami catatan CDC untuk detail.
Panduan ini memandu Anda melalui setiap langkah yang diperlukan untuk memulai streaming perubahan tingkat baris yang berkomitmen dari cluster Aurora DSQL ke aliran data Amazon Kinesis. Pada akhir panduan ini, Anda telah membuat pipeline CDC yang berfungsi dan skrip Python yang membaca dan mencetak catatan perubahan.
Prasyarat
Sebelum Anda mulai, konfirmasikan hal berikut:
-
Anda telah membuat klaster Aurora DSQL dalam status.
ACTIVEJika klaster Anda menganggur, sambungkan dengan PostgreSQL-compatible klien mana pun untuk membangunkannya sebelum Anda membuat aliran CDC.CreateStreammengembalikan kesalahan validasi jika cluster tidak dalamACTIVEstatus. -
Aurora DSQL mengharuskan semua sumber daya CDC — cluster, aliran data Amazon Kinesis, peran layanan IAM, dan prinsip pemanggilan—berada di akun yang sama. AWS
-
Aliran data Amazon Kinesis Anda berada di AWS Wilayah yang sama dengan cluster Aurora DSQL Anda.
-
Anda telah menginstal dan mengonfigurasi kredensi AWS CLI dengan yang memiliki izin untuk membuat peran IAM dan aliran data Amazon Kinesis.
Langkah 1: Buat aliran data Amazon Kinesis
Buat aliran data Kinesis di AWS akun dan Wilayah yang sama dengan cluster Aurora DSQL Anda. Catatan CDC lebih besar dari baris Aurora DSQL yang sesuai karena format JSON mencakup nama kolom, metadata, dan overhead encoding.
Mengukur aliran data Kinesis
Aurora DSQL CDC memberikan baris penuh pada setiap perubahan. Pembaruan yang menyentuh satu kolom menghasilkan catatan yang berisi setiap kolom di baris. Hapus catatan adalah pengecualian — mereka hanya menyertakan kolom kunci utama.
Perkirakan ukuran rekor rata-rata
Ukur ukuran baris rata-rata pada disk untuk memahami volume yang akan dihasilkan CDC dan untuk mengantisipasi catatan yang terlalu besar. Query berikut mengembalikan ukuran tuple rata-rata dalam byte untuk tabel:
SELECT avg(pg_column_size(t.*)) FROMyour_tablet;
Amplop catatan CDC menambahkan nama kolom, metadata, dan overhead encoding di atas ukuran baris. Untuk format rekaman yang tepat, lihatRekam muatan. Untuk bagaimana Aurora DSQL menangani catatan yang melebihi batas ukuran rekaman Kinesis, lihat. Menangani catatan besar Untuk set lengkap batas layanan Kinesis, lihat Kuota dan batas Amazon Kinesis Data Streams di Panduan Pengembang Amazon Kinesis Data Streams.
penting
Saat Anda membuat aliran data Kinesis, atur yang berikut ini:
-
MaxRecordSizeInKiBke10240(10 MiB). Kinesis default maksimum 1 MiB tidak selalu cukup besar untuk catatan Aurora DSQL CDC. Setiap catatan yang melebihi ukuran catatan Kinesis yang dikonfigurasi menyebabkan aliran CDC menjadi terganggu.KINESIS_OVERSIZE_RECORDAurora DSQL membagi catatan besar menjadi fragmen yang masing-masing dapat mendekati 10 MiB, sehingga aliran data Kinesis perlu menerima catatan sebesar itu. Lihat perinciannya di Menangani catatan besar. -
StreamModekeON_DEMAND. On-demand mode menskalakan kapasitas pecahan secara otomatis dan melindungi Anda dari kekurangan penyediaan selama lonjakan yang tidak terduga. Kinesis masih dapat kembaliWriteProvisionedThroughputExceededselama ledakan skala detik yang tajam saat kapasitas meningkat. Rencanakan acara pelambatan singkat.
Buat CloudWatch alarm di IncomingBytes dan WriteProvisionedThroughputExceeded di AWS/Kinesis namespace. Kinesis throttling memperlambat pengiriman CDC dan meningkatkan kelambatan replikasi. Untuk DSQL-side metrik Aurora dan panduan alarm, lihat. Memantau praktik terbaik
Contoh berikut menggunakan AWS CLI. Jika AWS CLI versi Anda tidak mendukung --max-record-size-in-ki-b parameter, gunakan AWS SDK untuk memanggil operasi CreateStreamKinesis.
aws kinesis create-stream \ --stream-namemy-cdc-stream\ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --regionregion
Tunggu hingga streaming menjadi aktif:
aws kinesis describe-stream-summary \ --stream-namemy-cdc-stream\ --regionregion\ --query 'StreamDescriptionSummary.StreamStatus'
Perintah kembali "ACTIVE" ketika aliran sudah siap.
Rekam aliran ARN dari output. Anda membutuhkannya dalam langkah-langkah berikut. ARN memiliki formatnya. arn:aws:kinesis:region:account-id:stream/my-cdc-stream
Langkah 2: Buat peran IAM untuk Aurora DSQL
Aurora DSQL mengasumsikan peran IAM untuk menulis catatan CDC ke aliran data Kinesis Anda. Pada langkah ini, Anda membuat peran dengan kebijakan kepercayaan dan melampirkan kebijakan izin. Untuk penjelasan lengkap tentang setiap elemen kebijakan, lihatMengkonfigurasi IAM.
Buat file kebijakan kepercayaan
Simpan JSON berikut sebagaitrust-policy.json. Ganti your-account-idregion,, dan cluster-id dengan nilai-nilai Anda.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
Buat peran
Jalankan perintah berikut untuk membuat peran IAM:
aws iam create-role \ --role-namedsql-cdc-role\ --assume-role-policy-document file://trust-policy.json
Buat file kebijakan izin
Simpan JSON berikut sebagaipermissions-policy.json. Ganti nilai placeholder dengan ARN aliran data Kinesis Anda. KMSAccessPernyataan ini hanya diperlukan jika aliran data Kinesis Anda menggunakan kunci yang dikelola AWS KMS pelanggan, tetapi Anda dapat memasukkannya terlebih dahulu sehingga menambahkan kunci yang dikelola pelanggan nanti tidak merusak aliran CDC Anda. Untuk penjelasan lengkap tentang setiap kondisi, lihatKebijakan izin peran layanan.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
Lampirkan kebijakan izin
Jalankan perintah berikut:
aws iam put-role-policy \ --role-namedsql-cdc-role\ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json
Rekam peran ARN dari output. create-role ARN memiliki formatnya. arn:aws:iam::your-account-id:role/dsql-cdc-role
Langkah 3: Buat aliran CDC
Gunakan AWS CLI untuk membuat aliran CDC yang menghubungkan cluster Aurora DSQL Anda ke aliran data Kinesis. Ganti nilai placeholder dengan Kinesis stream ARN dari Langkah 1, ARN peran IAM dari Langkah 2, dan pengidentifikasi cluster Anda.
aws dsql create-stream \ --cluster-identifiercluster-id\ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --regionregion
Respons termasuk pengenal aliran dan status. CREATING Pembuatan streaming biasanya memakan waktu satu hingga tiga menit.
Tunggu hingga streaming menjadi aktif
Polling status streaming hingga mencapaiACTIVE:
aws dsql get-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion\ --query 'status'
Anda juga dapat menggunakan StreamActive pelayan di AWS SDK untuk melakukan polling secara otomatis.
Setelah streaming mencapaiACTIVE, Aurora DSQL mulai mengirimkan perubahan tingkat baris yang berkomitmen ke aliran data Kinesis Anda.
catatan
Setiap cluster Aurora DSQL memiliki jumlah maksimum aliran CDC. Jika Anda mencapai batas ini, CreateStream kembalikan aServiceQuotaExceededException. Untuk batas default, lihat Kuota dan batas.
Langkah 4: Verifikasi bahwa catatan mengalir
Masukkan baris ke dalam tabel di cluster Aurora DSQL Anda. Contoh:
CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');
Baca dari aliran data Kinesis untuk memverifikasi bahwa catatan CDC tiba:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-namemy-cdc-stream\ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --regionregion\ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --regionregion
Setiap Data bidang catatan berisi muatan JSON. Saat Anda menggunakan AWS CLI, muatannya ada Base64-encoded dalam respons. Saat Anda menggunakan boto3 SDK, SDK menerjemahkannya secara otomatis. JSON yang diterjemahkan terlihat seperti berikut:
{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }
Untuk deskripsi lengkap dari setiap bidang, lihatMemahami catatan CDC.
Langkah 5: Konsumsi catatan dengan skrip Python
Skrip Python berikut membaca catatan CDC dari aliran data Kinesis dan mencetak setiap peristiwa perubahan. Skrip menggunakan klien boto3 Amazon Kinesis untuk mengulangi pecahan dan memecahkan kode setiap catatan. Karena Aurora DSQL CDC menggunakan pengiriman paling tidak sekali, skrip mungkin mencetak catatan yang sama lebih dari satu kali.
""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)
Jalankan skrip .
pip install boto3 python consume_cdc.py \ --stream-namemy-cdc-stream\ --regionregion
Skrip mencetak setiap peristiwa perubahan saat tiba. Anda melihat output yang mirip dengan berikut ini:
Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
Menambahkan deduplikasi last-writer-win
Karena Aurora DSQL CDC menggunakan pengiriman paling sedikit sekali, aplikasi produksi harus menghapus duplikasi dan memesan catatan. Contoh kode berikut menunjukkan pendekatan tanda air tinggi: untuk setiap kunci utama, ia melacak yang tertinggi yang source.ts_ns terlihat sejauh ini dan membuang catatan apa pun dengan stempel waktu yang sama atau sebelumnya. Setel PK_COLUMNS ke nama kolom kunci utama dari tabel yang sedang Anda proses. Untuk strategi yang menangani beberapa tabel atau penghapusan, lihatStrategi konsumen.
# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True
Mengelola aliran CDC
Daftar aliran
Untuk mencantumkan semua aliran CDC untuk klaster, gunakan operasi: ListStreams
aws dsql list-streams \ --cluster-identifiercluster-id\ --regionregion
Menghapus aliran
Untuk menghapus aliran CDC, jalankan perintah berikut:
aws dsql delete-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion
Anda dapat menggunakan StreamNotExists pelayan untuk melakukan polling GetStream hingga a ResourceNotFoundException dikembalikan, menunjukkan bahwa Aurora DSQL telah sepenuhnya menghapus aliran.