本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将来自多个表的数据联接到一个文档中
RDS 源插件中的joins配置允许将标准化关系表自动非规范化为单个文档。 OpenSearch 配置后,管道会从多个相关表中读取变更数据捕获 (CDC) 事件,并使用基于 Painless 脚本的 upsert 将它们合并到父文档中。
本主题介绍如何在 Aurora 和 Amazon RDS 摄取管道中配置表联接,包括联接类型、先决条件、配置语法、版本跟踪和常见问题疑难解答。
先决条件
-
对关系数据库概念有基本的了解,包括主键、外键和表关系。
-
具有外键关系(父子关系)的表。
-
中的所有表都
relations必须列在中tables.include。 -
父表必须有主键。
-
子表必须有一列引用父表的主键。
-
子表必须有自己的主键 (
child_primary_key),用于识别 upserts 中的各个记录。
支持的联接模式
-
父母 → 孩子 (1:1)
-
父母 → 孩子 (1:N)
-
每位家长有多个孩子
示例配置
以下 YAML 配置显示了如何为 Aurora 源插件定义父子表关系。此示例使用不同的联接类型配置具有两个子表的父表:
version: "2" aurora-joins-pipeline: source: rds: db_identifier: "my-aurora-cluster" engine: "aurora-mysql" database: "my_database" tables: include: - "parent_table" - "child_table_1" - "child_table_2" s3_bucket: "my-pipeline-bucket" s3_region: "us-east-1" s3_prefix: "rds-export" export: kms_key_id: "my-kms-key-id" iam_role_arn: "arn:aws:iam::123456789012:role/my-export-role" stream: true aws: sts_role_arn: "arn:aws:iam::123456789012:role/my-pipeline-role" region: "us-east-1" authentication: username: ${{aws_secrets:secret:username}} password: ${{aws_secrets:secret:password}} joins: version_field: "__versions" relations: - parent: "parent_table" child: "child_table_1" parent_key: "id" child_key: "parent_id" child_primary_key: "child_id" join_type: "one_to_many" max_child_records: 100 - parent: "parent_table" child: "child_table_2" parent_key: "id" child_key: "parent_id" child_primary_key: "child_id" join_type: "one_to_one" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"] index: "my-joined-index" document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" aws: sts_role_arn: "arn:aws:iam::123456789012:role/my-pipeline-role" region: "us-east-1" extension: aws: secrets: secret: secret_id: "arn:aws:secretsmanager:us-east-1:123456789012:secret:my-db-secret" region: "us-east-1" sts_role_arn: "arn:aws:iam::123456789012:role/my-pipeline-role" refresh_interval: PT1H
联接类型
- 一对一
-
子字段在父文档的根级别上展平。当每个家长都只有一个相关的子记录时使用。
Parent table: orders (order_id, customer_name, total) Child table: shipping (shipping_id, order_id, tracking_number, carrier) Result document: { "order_id": 1, "customer_name": "Alice", "total": 299.99, "shipping_id": "1", "tracking_number": "TRK-1", "carrier": "FedEx" } - 一对多
-
子记录以嵌套数组的形式存储在父文档中。当每个家长可以有多个相关的子记录时使用。
Parent table: orders (order_id, customer_name, total) Child table: order_items (item_id, order_id, product_name, quantity, price) Result document: { "order_id": 1, "customer_name": "Alice", "total": 299.99, "order_items": [ {"item_id": 10, "product_name": "Keyboard", "quantity": 1, "price": 149.99}, {"item_id": 11, "product_name": "Mouse", "quantity": 1, "price": 29.99} ] }
文件结构
- 文档 ID
-
OpenSearch 文档设置
_id为父表的主键值。同一个父项的所有子记录都合并到这个单一文档中。 - 版本跟踪
-
version_field(默认:__versions)存储每个表版本计数器的映射:"__versions": { "orders": 12345678901, "order_items": 12345678902, "shipping": 12345678903 }每个表的版本均源自导出时间戳或二进制日志时间戳。当导出或 CDC 事件到达时,Painless 脚本会检查传入的版本是否比该特定表的存储版本更新。如果时间较长,则会跳过该活动。这可确保:
-
等能处理 — 可以安全地忽略重播的事件。
-
独立版本控制-订单更新不会干扰商品更新。
-
并行安全-正确处理同一个父项的多个子插入。
-
- 文档 _version
-
OpenSearch
_version字段会随着每次成功的 upsert 而递增。对于包含 1 个父项 + N 个商品 + 1 个配送记录的文档,_version= N + 2。
限制
-
Single-level 仅加入 — 父级 → 子级。 Multi-level 不支持(父母 → 孩子 → 孙子)。
-
每个管道一个父表。所有子表都连接到同一个父表。
-
子表不能属于同一个管道中的多个父表。
-
max_child_records限制one_to_many联接的数组大小。超过此限制的记录将被删除。
字段名冲突
对于one_to_one联接,子字段在根级别进行扁平化。如果子表的列与父列同名,则子级值将覆盖父级值。在父表和one_to_one子表中使用不同的列名。
对于one_to_many联接,子字段嵌套在以子表命名的数组中,因此冲突不是问题。
问题排查
- 缺少儿童记录的文件
-
-
签
__versions入文档。如果缺少子表的版本,则子事件尚未处理。 -
验证管道是否处于活动状态且
changeEventsProcessed指标不为零。
-
- 文档未出现在 OpenSearch
-
-
检查源表中是否存在父记录。
-
验证
tables.include列表是否包含所有必需的表。 -
检查管道日志中是否存在摄取失败和配置问题:.
/aws/vendedlogs/pipeline-log-group
-