

# CDC 스트림 시작하기
<a name="cdc-setup"></a>

**중요**  
이 기능은 AWS 프리뷰로 제공되며 변경될 수 있습니다. 자세한 내용은 [AWS 서비스 약관](https://aws.amazon.com/service-terms/)의 섹션 2, 베타 및 프리뷰를 참조하세요. CDC 스트림 가격에 대한 자세한 내용은 [Aurora DSQL 가격](https://aws.amazon.com/rds/aurora/dsql/pricing/) 페이지를 참조하세요.  
정식 출시 전에 스트림 페이로드에 새 작업 유형(업데이트를 위한 `"op": "u"`)을 추가합니다. 애플리케이션이 이러한 변경 사항을 수정하지 않고 처리하도록 하려면 `after` 페이로드를 적용하여 인식할 수 없는 `op` 값을 업서트로 취급합니다. 세부 정보는 [CDC 레코드 이해](cdc-record-format.md) 섹션을 참조하세요.

이 가이드에서는 Aurora DSQL 클러스터에서 Amazon Kinesis 데이터 스트림으로 커밋된 행 수준 변경 사항 스트리밍을 시작하는 데 필요한 모든 단계를 안내합니다. 이 가이드가 끝날 때까지 유효한 CDC 파이프라인과 변경 레코드를 읽고 인쇄하는 Python 스크립트를 생성했습니다.

## 사전 조건
<a name="cdc-prerequisites"></a>

시작하기 전에 다음을 확인합니다.
+ Aurora DSQL 클러스터를 `ACTIVE` 상태로 생성했습니다. 클러스터가 유휴 상태인 경우 CDC 스트림을 생성하기 전에 PostgreSQL 호환 클라이언트와 연결하여 해제합니다. `CreateStream`은 클러스터가 `ACTIVE` 상태가 아닌 경우 검증 오류를 반환합니다.
+ Aurora DSQL을 사용하려면 클러스터, Amazon Kinesis 데이터 스트림, IAM 서비스 역할, 직접 호출 위탁자 등 모든 CDC 리소스가 동일한 AWS 계정에 있어야 합니다.
+ Amazon Kinesis 데이터 스트림은 Aurora DSQL 클러스터와 동일한 AWS 리전에 있습니다.
+ IAM 역할 및 Amazon Kinesis 데이터 스트림을 생성할 권한이 있는 자격 증명으로 AWS CLI를 설치하고 구성했습니다.

## 1단계: Amazon Kinesis 데이터 스트림 생성
<a name="cdc-step1-kinesis"></a>

Aurora DSQL 클러스터와 동일한 AWS 계정 및 리전에 Kinesis 데이터 스트림을 생성합니다. JSON 형식에는 열 이름, 메타데이터 및 인코딩 오버헤드가 포함되므로 CDC 레코드는 해당 Aurora DSQL 행보다 큽니다.

### Kinesis 데이터 스트림 크기 조정
<a name="cdc-sizing"></a>

Aurora DSQL CDC는 모든 변경 사항에 대해 전체 행을 제공합니다. 단일 열을 터치하는 업데이트는 행의 모든 열을 포함하는 레코드를 생성합니다. 레코드 삭제는 예외이며 프라이머리 키 열만 포함합니다.

**평균 레코드 크기 추정**  
평균 온디스크 행 크기를 측정하여 CDC가 생성할 볼륨을 이해하고 크기 초과 레코드를 예측합니다. 다음 쿼리는 테이블의 평균 튜플 크기를 바이트 단위로 반환합니다.

```
SELECT avg(pg_column_size(t.*)) FROM {{your_table}} t;
```

CDC 레코드 봉투는 행 크기 위에 열 이름, 메타데이터 및 인코딩 오버헤드를 추가합니다. 정확한 레코드 형식은 [레코드 페이로드](cdc-record-format.md#cdc-record-payload) 섹션을 참조하세요. Aurora DSQL이 Kinesis 레코드 크기 제한을 초과하는 레코드를 처리하는 방법은 [크기 초과 레코드 처리](cdc-record-format.md#cdc-oversized-records) 섹션을 참조하세요. Kinesis 서비스 제한의 전체 세트는 *Amazon Kinesis Data Streams 개발자 안내서*의 [Amazon Kinesis Data Streams 할당량 및 제한](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)을 참조하세요.

**중요**  
Kinesis 데이터 스트림을 생성할 때 다음을 설정합니다.  
`MaxRecordSizeInKiB`를 `10240`(10MiB)으로. 기본 Kinesis 최대 1MiB가 항상 Aurora DSQL CDC 레코드에 대해 충분히 큰 것은 아닙니다. 레코드가 구성된 Kinesis 레코드 크기를 초과할 경우 `KINESIS_OVERSIZE_RECORD`로 CDC 스트림이 손상됩니다. Aurora DSQL은 크기가 큰 레코드를 각각 10MiB에 근접할 수 있는 조각으로 분할하므로 Kinesis 데이터 스트림은 해당 크기의 레코드를 수락해야 합니다. 자세한 내용은 [크기 초과 레코드 처리](cdc-record-format.md#cdc-oversized-records)을 참조하세요.
`StreamMode` \~`ON_DEMAND` . 온디맨드 모드는 샤드 용량을 자동으로 확장하고 예기치 않은 스파이크 발생 시 과소 프로비저닝으로부터 사용자를 보호합니다. Kinesis는 용량이 스케일 업됨에 따라 급격한 초 단위 버스트 중에도 여전히 `WriteProvisionedThroughputExceeded`를 반환할 수 있습니다. 짧은 스로틀링 이벤트를 계획합니다.

`AWS/Kinesis` 네임스페이스에서 `IncomingBytes` 및 `WriteProvisionedThroughputExceeded`에 CloudWatch 경보를 생성합니다. Kinesis 스로틀링은 CDC 전송 속도를 늦추고 복제 지연을 증가시킵니다. Aurora DSQL 측 지표 및 경보 지침은 [모니터링 모범 사례](cdc-monitoring.md#cdc-monitoring-best-practices) 섹션을 참조하세요.

다음 예제에서는 AWS CLI를 사용합니다. AWS CLI 버전이 `--max-record-size-in-ki-b` 파라미터를 지원하지 않는 경우 AWS SDK를 사용하여 Kinesis [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html) 작업을 직접적으로 호출합니다.

```
aws kinesis create-stream \
  --stream-name {{my-cdc-stream}} \
  --stream-mode-details StreamMode=ON_DEMAND \
  --max-record-size-in-ki-b 10240 \
  --region {{region}}
```

스트림이 다시 활성 상태가 될 때까지 대기:

```
aws kinesis describe-stream-summary \
  --stream-name {{my-cdc-stream}} \
  --region {{region}} \
  --query 'StreamDescriptionSummary.StreamStatus'
```

명령은 스트림이 준비되면 `"ACTIVE"`를 반환합니다.

출력에서 스트림 ARN을 기록합니다. 다음 단계에서 필요합니다. ARN의 형식은 `arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{my-cdc-stream}}`입니다.

## 2단계: Aurora DSQL에 대한 IAM 역할 생성
<a name="cdc-step2-iam"></a>

Aurora DSQL은 IAM 역할을 수임하여 Kinesis 데이터 스트림에 CDC 레코드를 작성합니다. 이 단계에서는 신뢰 정책을 사용하여 역할을 생성하고 권한 정책을 연결합니다. 각 정책 요소에 대한 전체 설명은 [IAM 구성](cdc-iam.md) 섹션을 참조하세요.

**신뢰 정책 파일 생성**  
다음 JSON을 `trust-policy.json`으로 저장합니다. {{your-account-id}}, {{region}} 및 {{cluster-id}}를 해당 값으로 바꿉니다.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "DSQLAccess",
            "Effect": "Allow",
            "Principal": {
                "Service": "dsql.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "{{your-account-id}}"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:dsql:{{region}}:{{your-account-id}}:cluster/{{cluster-id}}/stream/*"
                }
            }
        }
    ]
}
```

**역할 만들기**  
다음 명령을 실행하여 IAM 역할을 만듭니다.

```
aws iam create-role \
  --role-name {{dsql-cdc-role}} \
  --assume-role-policy-document file://trust-policy.json
```

**권한 정책 파일 생성**  
다음 JSON을 `permissions-policy.json`으로 저장합니다. 자리 표시자 값을 Kinesis 데이터 스트림 ARN으로 바꿉니다. 이 `KMSAccess` 문은 Kinesis 데이터 스트림이 AWS KMS 고객 관리형 키를 사용하는 경우에만 필요하지만, 나중에 고객 관리형 키를 추가해도 CDC 스트림이 손상되지 않도록 선제적으로 포함할 수 있습니다. 각 조건에 관한 완전한 설명을 보려면 [서비스 역할 권한 정책](cdc-iam.md#cdc-iam-permissions-policy) 섹션을 참조하세요.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisAccess",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}"
        },
        {
            "Sid": "KMSAccess",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": "arn:aws:kms:*:*:key/*",
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.{{region}}.amazonaws.com",
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}",
                    "aws:ResourceAccount": "${aws:PrincipalAccount}"
                }
            }
        }
    ]
}
```

**권한 정책 연결**  
다음 명령을 실행합니다.

```
aws iam put-role-policy \
  --role-name {{dsql-cdc-role}} \
  --policy-name dsql-cdc-kinesis-access \
  --policy-document file://permissions-policy.json
```

`create-role` 출력에서 역할 ARN을 기록합니다. ARN의 형식은 `arn:aws:iam::{{your-account-id}}:role/{{dsql-cdc-role}}`입니다.

## 3단계: CDC 스트림 생성
<a name="cdc-step3-create-stream"></a>

AWS CLI를 사용하여 Aurora DSQL 클러스터를 Kinesis 데이터 스트림에 연결하는 CDC 스트림을 생성합니다. 자리 표시자 값을 1단계의 Kinesis 스트림 ARN, 2단계의 IAM 역할 ARN 및 클러스터 식별자로 바꿉니다.

```
aws dsql create-stream \
  --cluster-identifier {{cluster-id}} \
  --target-definition '{"kinesis":{"streamArn":"{{kinesis-stream-arn}}","roleArn":"{{role-arn}}"}}' \
  --ordering UNORDERED \
  --format JSON \
  --tags '{"Name":"{{my-cdc-stream}}"}' \
  --region {{region}}
```

응답에는 스트림 식별자와 `CREATING` 상태가 포함됩니다. 스트림 생성에는 일반적으로 1\~3분이 소요됩니다.

**스트림이 다시 활성 상태가 될 때까지 대기**  
스트림 상태가 `ACTIVE`에 도달할 때까지 폴링합니다.

```
aws dsql get-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}} \
  --query 'status'
```

AWS SDK의 `StreamActive` 웨이터를 사용하여 자동으로 폴링할 수도 있습니다.

스트림이 `ACTIVE`에 도달하면 Aurora DSQL은 커밋된 행 수준 변경 사항을 Kinesis 데이터 스트림에 전송하기 시작합니다.

**참고**  
각 Aurora DSQL 클러스터에는 최대 CDC 스트림 수가 있습니다. 이 한도에 도달하면 `CreateStream`이 `ServiceQuotaExceededException`을 반환합니다. 기본 한도는 [할당량 및 한도](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/CHAP_quotas.html)를 참조하세요.

## 4단계: 레코드가 흐르고 있는지 확인
<a name="cdc-step4-verify"></a>

Aurora DSQL 클러스터의 테이블에 행을 삽입합니다. 예제:

```
CREATE TABLE IF NOT EXISTS test_cdc (
    id INT PRIMARY KEY,
    message TEXT
);

INSERT INTO test_cdc VALUES (1, 'hello cdc');
```

Kinesis 데이터 스트림에서 읽어 CDC 레코드가 도착했는지 확인합니다.

```
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \
  --stream-name {{my-cdc-stream}} \
  --shard-id shardId-000000000000 \
  --shard-iterator-type TRIM_HORIZON \
  --region {{region}} \
  --query 'ShardIterator' --output text)

aws kinesis get-records \
  --shard-iterator "$SHARD_ITERATOR" \
  --region {{region}}
```

각 레코드의 `Data` 필드에는 JSON 페이로드가 포함됩니다. AWS CLI를 사용하면 응답에서 페이로드가 Base64로 인코딩됩니다. `boto3` SDK를 사용하면 SDK가 자동으로 디코딩합니다. 디코딩된 JSON은 다음과 같습니다.

```
{
    "type": "full",
    "op": "c",
    "before": null,
    "after": {"id": 1, "message": "hello cdc"},
    "source": {
        "version": "1.0",
        "ts_ms": 1705318200000,
        "ts_ns": 1705318200000000000,
        "txId": "ffthunp5stx6ffs2vyfqoatmfu",
        "schema": "public",
        "table": "test_cdc",
        "db": "postgres",
        "cluster": "{{cluster-id}}"
    },
    "ts_ms": 1705318200125,
    "ts_ns": 1705318200125483291
}
```

각 필드에 대한 완전한 설명은 [CDC 레코드 이해](cdc-record-format.md) 섹션을 참조하세요.

## 5단계: Python 스크립트로 레코드 사용
<a name="cdc-step5-consume"></a>

다음 Python 스크립트는 Kinesis 데이터 스트림에서 CDC 레코드를 읽고 각 변경 이벤트를 인쇄합니다. 스크립트는 `boto3` Amazon Kinesis 클라이언트를 사용하여 샤드를 반복하고 각 레코드를 디코딩합니다. Aurora DSQL CDC는 최소 1번 전송을 사용하므로 스크립트는 동일한 레코드를 두 번 이상 인쇄할 수 있습니다.

```
"""
Read CDC records from an Amazon Kinesis data stream.

Usage:
    pip install boto3
    python consume_cdc.py --stream-name my-cdc-stream --region us-east-1
"""
from __future__ import annotations

import argparse
import json

import boto3


def consume_cdc(stream_name: str, region: str) -> None:
    kinesis = boto3.client("kinesis", region_name=region)

    # List all shards (paginate if the stream has many shards)
    shard_ids: list[str] = []
    paginator = kinesis.get_paginator("list_shards")
    for page in paginator.paginate(StreamName=stream_name):
        shard_ids.extend(s["ShardId"] for s in page["Shards"])
    print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))")

    for shard_id in shard_ids:
        iterator_response = kinesis.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType="TRIM_HORIZON",
        )
        shard_iterator = iterator_response["ShardIterator"]

        while shard_iterator:
            records_response = kinesis.get_records(
                ShardIterator=shard_iterator, Limit=100
            )
            shard_iterator = records_response.get("NextShardIterator")

            for record in records_response["Records"]:
                # boto3 decodes Base64 automatically; record["Data"] is bytes.
                payload = json.loads(record["Data"])

                # A record's "type" field identifies its structure.
                # "full": inlined record with before/after values.
                # "chunked": main record that references fragments for a split image.
                # "fragment": one piece of a chunked image; reassemble in production code.
                # For details, see cdc-record-format.html#cdc-oversized-records.
                record_type = payload.get("type", "full")
                if record_type == "fragment":
                    print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}")
                    continue

                source = payload["source"]
                op = payload["op"]
                ts_ns = source["ts_ns"]
                tx_id = source["txId"]
                table = f"{source['schema']}.{source['table']}"

                # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent
                # release will emit "u" for updates, and "c" for inserts. Design your
                # consumer to handle all three values; this map stays correct across the
                # transition.
                op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"}
                print(
                    f"[{op_labels.get(op, op)}] {table} "
                    f"txId={tx_id} ts_ns={ts_ns} type={record_type}"
                )
                if payload.get("after"):
                    print(f"  after:  {json.dumps(payload['after'])}")
                if payload.get("before"):
                    print(f"  before: {json.dumps(payload['before'])}")
                if record_type == "chunked":
                    print(f"  chunked: {json.dumps(payload['chunked'])}")

            if not records_response["Records"]:
                break  # No more records in this shard


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Consume DSQL CDC records from Kinesis"
    )
    parser.add_argument("--stream-name", required=True, help="Kinesis stream name")
    parser.add_argument("--region", required=True, help="AWS Region")
    args = parser.parse_args()
    consume_cdc(args.stream_name, args.region)
```

스크립트를 실행합니다.

```
pip install boto3
python consume_cdc.py \
  --stream-name {{my-cdc-stream}} \
  --region {{region}}
```

스크립트는 각 변경 이벤트가 도착하면 인쇄합니다. 출력은 다음과 비슷합니다.

```
Reading from my-cdc-stream (4 shard(s))
[INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full
  after:  {"id": 1, "message": "hello cdc"}
```

**마지막 쓰기 우선 중복 제거 추가**  
Aurora DSQL CDC는 최소 1번 전송을 사용하므로 프로덕션 앱은 레코드를 중복 제거 및 순서 지정해야 합니다. 다음 코드 예제는 최고 수위선 접근 방식을 보여줍니다. 각 프라이머리 키에 대해 지금까지 관찰된 가장 높은 `source.ts_ns`를 추적하고 타임스탬프가 같거나 이전인 모든 레코드를 삭제합니다. `PK_COLUMNS`를 처리 중인 테이블의 프라이머리 키 열 이름으로 설정합니다. 여러 테이블 또는 삭제를 처리하는 전략은 [소비자 전략](cdc-streams.md#cdc-consumer-strategies) 섹션을 참조하세요.

```
# Set PK_COLUMNS to the primary key column(s) of your table.
PK_COLUMNS = ["id"]

# Maps each primary key value to the highest ts_ns seen for that key.
high_water: dict[tuple, int] = {}

def process_record(payload: dict) -> bool:
    """Return True if the record is new, False if it's a duplicate or stale.

    Skip fragment records; reassemble them into a full image before calling this.
    """
    if payload.get("type") == "fragment":
        return False  # Fragments are reassembled upstream, not deduplicated here.

    source = payload["source"]
    ts_ns = source["ts_ns"]
    op = payload["op"]

    # For inserts/updates the row is in "after"; for deletes it's in "before".
    row = payload.get("after") or payload.get("before") or {}
    pk = tuple(row.get(col) for col in PK_COLUMNS)

    prev_ts = high_water.get(pk, -1)
    if ts_ns <= prev_ts:
        return False  # Duplicate or out-of-order record

    high_water[pk] = ts_ns
    return True
```

## CDC 스트림 관리
<a name="cdc-manage-streams"></a>

**스트림 나열**  
클러스터의 모든 CDC 스트림을 나열하려면 `ListStreams` 작업을 사용합니다.

```
aws dsql list-streams \
  --cluster-identifier {{cluster-id}} \
  --region {{region}}
```

**스트림 삭제**  
CDC 스트림을 삭제하려면 다음 명령을 실행합니다.

```
aws dsql delete-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}}
```

Aurora DSQL이 스트림을 완전히 삭제했음을 나타내는 `ResourceNotFoundException`가 반환될 때까지 `StreamNotExists` 웨이터를 사용하여 `GetStream`을 폴링할 수 있습니다.