Debezium系列之:使用Debezium接入SQL Server数据库数据到Kafka集群的详细技术文档

  • Post author:
  • Post category:其他


Debezium SQL Server 连接器第一次连接到 SQL Server 数据库或集群时,它会获取数据库中模式的一致快照。初始快照完成后,连接器会持续捕获提交到启用了 CDC 的 SQL Server 数据库的 INSERT、UPDATE 或 DELETE 操作的行级更改。连接器为每个数据更改操作生成事件,并将它们流式传输到 Kafka 主题。连接器将表的所有事件流式传输到专用的 Kafka 主题。然后应用程序和服务可以使用来自该主题的数据更改事件记录。



一、Debezium概述

Debezium SQL Server 连接器基于 SQL Server 2016 Service Pack 1 (SP1) 和更高版本的标准版或企业版中提供的更改数据捕获功能。 SQL Server 捕获进程监视指定的数据库和表,并将更改存储到专门创建的具有存储过程外观的更改表中。

要启用 Debezium SQL Server 连接器以捕获数据库操作的更改事件记录,您必须首先在 SQL Server 数据库上启用更改数据捕获。必须在数据库和要捕获的每个表上都启用 CDC。在源数据库上设置 CDC 后,连接器可以捕获数据库中发生的行级 INSERT、UPDATE 和 DELETE 操作。连接器将每个源表的事件记录写入专门用于该表的 Kafka 主题。每个捕获的表都存在一个主题。客户端应用程序读取它们遵循的数据库表的 Kafka 主题,并且可以响应它们从这些主题中使用的行级事件。

连接器第一次连接到 SQL Server 数据库或集群时,它会为所有配置为捕获更改的表拍摄模式的一致快照,并将此状态流式传输到 Kafka。快照完成后,连接器会持续捕获随后发生的行级更改。通过首先建立所有数据的一致视图,连接器可以继续读取,而不会丢失在快照发生时所做的任何更改。

Debezium SQL Server 连接器可以容忍故障。当连接器读取更改并产生事件时,它会定期记录事件在数据库日志中的位置(LSN/日志序列号)。如果连接器因任何原因(包括通信故障、网络问题或崩溃)停止,则在重新启动后,连接器将从它读取的最后一点继续读取 SQL Server CDC 表。

注意:偏移量会定期提交。它们不会在更改事件发生时提交。因此,在中断之后,可能会生成重复事件。

容错也适用于快照。也就是说,如果连接器在快照期间停止,则连接器在重新启动时会开始新的快照。



二、SQL Server 连接器的工作原理

为了优化配置和运行 Debezium SQL Server 连接器,了解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及使用元数据会很有帮助。



1.Snapshots

SQL Server CDC 并非旨在存储数据库更改的完整历史记录。为了让 Debezium SQL Server 连接器为数据库的当前状态建立基线,它使用了一个称为快照的过程。

您可以配置连接器创建快照的方式。默认情况下,连接器的快照模式设置为初始。基于此初始快照模式,连接器第一次启动时,会执行数据库的初始一致快照。此初始快照捕获与为连接器配置的包含和排除属性定义的条件匹配的任何表的结构和数据(例如,table.include.list、column.include.list、table.exclude.list、等等)。

当连接器创建快照时,它会完成以下任务:

  • 1.确定要捕获的表。
  • 2.获得对启用了 CDC 的 SQL Server 表的锁定,以防止在创建快照期间发生结构更改。锁的级别由 snapshot.isolation.mode 配置选项决定。
  • 3.读取服务器事务日志中的最大日志序列号 (LSN) 位置。
  • 4.捕获所有相关表的结构。
  • 5.如有必要,释放在步骤 2 中获得的锁。在大多数情况下,锁只保留很短的时间。
  • 6.根据步骤 3 中读取的 LSN 位置扫描要捕获的 SQL Server 源表和模式,为表中的每一行生成 READ 事件,并将事件写入表的 Kafka 主题。
  • 7.在连接器偏移中记录快照的成功完成。

生成的初始快照捕获为 CDC 启用的表中每一行的当前状态。从这个基线状态,连接器会在后续更改发生时捕获它们。


Ad hoc snapshots

默认情况下,连接器仅在首次启动后才运行初始快照操作。在这个初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流式处理进入。

但是,在某些情况下,连接器在初始快照期间获得的数据可能会变得陈旧、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:

  • 修改连接器配置以捕获一组不同的表。
  • Kafka 主题被删除,必须重建。
  • 由于配置错误或其他问题而发生数据损坏。

您可以通过启动所谓的临时快照来为之前捕获快照的表重新运行快照。即席快照需要使用信令表。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了以前存在的主题,如果启用了自动主题创建,Debezium 可以自动创建主题。

即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的子集。

您可以通过向信令表发送执行快照消息来指定要捕获的表。将执行快照信号的类型设置为增量,并提供要包含在快照中的表的名称,如下表所述:

表 1. 即席执行快照信号记录示例

Field Default Value
type incremental 指定要运行的快照类型。设置类型是可选的。目前,您只能请求增量快照。
data-collections N/A 一个数组,其中包含要生成快照的表的完全限定名称。名称的格式与 signal.data.collection 配置选项的格式相同。


Triggering an ad hoc snapshot

您可以通过将具有执行快照信号类型的条目添加到信令表来启动临时快照。连接器处理完消息后,将开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个表的起点和终点。根据表中的条目数和配置的块大小,Debezium 将表划分为块,并继续对每个块进行快照,一次一个。

目前,执行快照操作类型仅触发增量快照。有关详细信息,请参阅增量快照。


Incremental snapshots

为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 机制将信号发送到 Debezium 连接器。

在增量快照中,Debezium 不是像在初始快照中那样一次捕获数据库的完整状态,而是在一系列可配置的块中分阶段捕获每个表。您可以指定您希望快照捕获的表以及每个块的大小。块大小决定了快照在数据库上的每次提取操作期间收集的行数。增量快照的默认块大小为 1 KB。

随着增量快照的进行,Debezium 使用水印来跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优势:

  • 您可以在流式数据捕获的同时运行增量快照,而不是将流式传输推迟到快照完成。连接器在整个快照过程中继续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。
  • 您可以在流式数据捕获的同时运行增量快照,而不是将流式传输推迟到快照完成。连接器在整个快照过程中继续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。
  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置以将表添加到其 table.include.list 属性后重新运行快照。


Incremental snapshot process


当您运行增量快照时,Debezium 按主键对每个表进行排序,然后根据配置的块大小将表拆分为块。逐块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个 READ 事件。该事件表示块的快照开始时行的值。

随着快照的进行,其他进程可能会继续访问数据库,可能会修改表记录。为了反映此类更改,INSERT、UPDATE 或 DELETE 操作将照常提交到事务日志。同样,正在进行的 Debezium 流式处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。


Debezium 如何解决具有相同主键的记录之间的冲突


在某些情况下,流式处理发出的 UPDATE 或 DELETE 事件是乱序接收的。也就是说,流式处理可能会在快照捕获包含该行的 READ 事件的块之前发出一个修改表行的事件。当快照最终为该行发出相应的 READ 事件时,它的值已经被取代。为了确保以正确的逻辑顺序处理乱序到达的增量快照事件,Debezium 采用了一种缓冲方案来解决冲突。只有在解决了快照事件和流事件之间的冲突后,Debezium 才会向 Kafka 发出事件记录。


Snapshot window

为了帮助解决延迟到达的 READ 事件和修改同一表行的流事件之间的冲突,Debezium 采用了所谓的快照窗口。快照窗口划分了增量快照捕获指定表块数据的时间间隔。在一个块的快照窗口打开之前,Debezium 遵循其通常的行为并从事务日志直接向下游发送事件到目标 Kafka 主题。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium 执行重复数据删除步骤以解决具有相同主键的事件之间的冲突。

对于每个数据集合,Debezium 发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户不断更新数据集合中的记录,事务日志也会更新以反映每次提交,Debezium 会针对每次更改发出 UPDATE 或 DELETE 操作。

当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传递到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。块的快照窗口关闭后,缓冲区仅包含不存在相关事务日志事件的 READ 事件。 Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。

连接器对每个快照块重复该过程。


Triggering an incremental snapshot

目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令表。您将信号作为 SQL INSERT 查询提交到表。 Debezium 检测到信号表中的变化后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值增量。

要指定要包含在快照中的表,请提供列出表的数据集合数组,例如,

{“数据集合”:[“public.MyFirstTable”,“public.MySecondTable”]}

增量快照信号的数据集合数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要任何操作并且不执行快照。

注意:

如果要包含在快照中的表的名称在数据库、模式或表的名称中包含点 (.),则要将表添加到 data-collections 数组中,则必须转义双引号中的名称。

例如,要包含一个存在于公共架构中且名称为 My.Table 的表,请使用以下格式:“public”.“My.Table”。

先决条件

  • 信令已启用。


    • 源数据库上存在信令数据集合,连接器配置为捕获它。

    • 信令数据集合在 signal.data.collection 属性中指定。

程序

发送 SQL 查询以将临时增量快照请求添加到信令表:

INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');

例如:

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');



2.读取变更数据表

当连接器第一次启动时,它会对捕获的表的结构进行结构快照,并将此信息保存到其内部数据库历史主题中。然后连接器为每个源表标识一个更改表,并完成以下步骤。

  • 1.对于每个更改表,连接器读取在上次存储的最大 LSN 和当前最大 LSN 之间创建的所有更改。
  • 2.连接器根据其提交 LSN 和更改 LSN 的值按升序对读取的更改进行排序。这种排序顺序可确保 Debezium 以与它们在数据库中发生的顺序相同的顺序重放更改。
  • 3.连接器将提交和更改 LSN 作为偏移量传递给 Kafka Connect。
  • 4.连接器存储最大 LSN 并从步骤 1 重新启动该过程。

重新启动后,连接器从它读取的最后一个偏移量(提交和更改 LSN)恢复处理。

连接器能够检测是否为包含的源表启用或禁用了 CDC,并调整其行为。



3.限制

SQL Server 特别要求基础对象是一个表,以便创建变更捕获实例。因此,SQL Server 不支持从索引视图(又名物化视图)捕获更改,因此 Debezium SQL Server 连接器也不支持。



4.topic名称

默认情况下,SQL Server 连接器将表中发生的所有 INSERT、UPDATE 和 DELETE 操作的事件写入特定于该表的单个 Apache Kafka 主题。连接器使用以下约定来命名更改事件主题:<serverName>.<schemaName>.<tableName>

以下列表提供了默认名称组件的定义:

服务器名称

  • 服务器的逻辑名称,由 database.server.name 配置属性指定。

模式名称

  • 发生更改事件的数据库模式的名称。

表名

  • 发生更改事件的数据库表的名称。

例如,如果fulfillment 是服务器名称,dbo 是模式名称,并且数据库包含名称为 products、products_on_hand、customers 和 orders 的表,则连接器会将更改事件记录流式传输到以下 Kafka 主题:

  • fulfillment.dbo.products
  • fulfillment.dbo.products_on_hand
  • fulfillment.dbo.customers
  • fulfillment.dbo.orders

连接器应用类似的命名约定来标记其内部数据库历史主题、模式更改主题和事务元数据主题。

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅主题路由。



5.架构更改主题

对于启用了 CDC 的每个表,Debezium SQL Server 连接器存储应用于数据库中捕获的表的架构更改事件的历史记录。连接器将架构更改事件写入名为 <serverName> 的 Kafka 主题,其中 serverName 是在 database.server.name 配置属性中指定的逻辑服务器名称。

连接器发送到架构更改主题的消息包含有效负载,并且(可选)还包含更改事件消息的架构。架构更改事件消息的有效负载包括以下元素:

数据库名称:应用语句的数据库的名称。 databaseName 的值用作消息键。

表更改:架构更改后整个表架构的结构化表示。 tableChanges 字段包含一个数组,其中包含表中每一列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者可以轻松读取消息,而无需先通过 DDL 解析器对其进行处理。

重要:

当连接器被配置为捕获表时,它不仅将表模式更改的历史存储在模式更改主题中,还存储在内部数据库历史主题中。内部数据库历史主题仅供连接器使用,不适合消费应用程序直接使用。确保需要有关架构更改通知的应用程序仅使用来自架构更改主题的信息。

连接器向其模式更改主题发出的消息格式处于孵化状态,可以在不通知的情况下更改。

当以下事件发生时,Debezium 会向模式更改主题发出消息:

  • 您为表启用 CDC。
  • 您为表禁用 CDC。
  • 您可以通过遵循模式演变过程来更改启用了 CDC 的表的结构。

示例:发送到 SQL Server 连接器架构更改主题的消息

以下示例显示了架构更改主题中的消息。该消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "1.9.6.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1588252618953,
      "snapshot": "true",
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": null,
      "commit_lsn": "00000025:00000d98:00a2",
      "event_serial_no": null
    },
    "databaseName": "testDB", 
    "schemaName": "dbo",
    "ddl": null, 
    "tableChanges": [ 
      {
        "type": "CREATE", 
        "id": "\"testDB\".\"dbo\".\"customers\"", 
        "table": { 
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ 
            "id"
          ],
          "columns": [ 
            {
              "name": "id",
              "jdbcType": 4,
              "nativeType": null,
              "typeName": "int identity",
              "typeExpression": "int identity",
              "charsetName": null,
              "length": 10,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "first_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "last_name",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "email",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "varchar",
              "typeExpression": "varchar",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ]
        }
      }
    ]
  }
}

表 3. 发送到模式更改主题的消息中的字段描述

Item Field name Description
1 databaseName schemaName 标识包含更改的数据库和架构。
2 ddl SQL Server 连接器始终为空。对于其他连接器,此字段包含负责架构更改的 DDL。此 DDL 对 SQL Server 连接器不可用。
3 tableChanges 包含由 DDL 命令生成的架构更改的一个或多个项目的数组。
4 type 描述变化的种类。该值为以下之一:CREATE – 创建的表 ALTER – 表已修改 DROP – 表已删除
5 id 创建、更改或删除的表的完整标识符。
6 table 表示应用更改后的表元数据。
7 primaryKeyColumnNames 组成表主键的列列表。
8 columns 已更改表中每一列的元数据。

在连接器发送到架构更改主题的消息中,键是包含架构更改的数据库的名称。在以下示例中,有效负载字段包含密钥:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.sqlserver.SchemaChangeKey"
  },
  "payload": {
    "databaseName": "testDB"
  }
}



三.数据更改事件

Debezium SQL Server 连接器为每个行级 INSERT、UPDATE 和 DELETE 操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于已更改的表。

Debezium 和 Kafka Connect 是围绕连续的事件消息流设计的。但是,这些事件的结构可能会随着时间的推移而发生变化,这对于消费者来说可能难以处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,消费者可以使用它从注册表中获取模式的模式 ID。这使得每个事件都是独立的。

以下骨架 JSON 显示了更改事件的基本四个部分。但是,如何配置您选择在应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。仅当您将转换器配置为生成它时,模式字段才处于更改事件中。同样,仅当您将转换器配置为生成它时,事件键和事件有效负载才在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:

{
 "schema": {    1
   ...
  },
 "payload": {   2
   ...
 },
 "schema": {    3
   ...
 },
 "payload": {   4
   ...
 },
}

表 4. 变更事件基本内容概览

Item Field name Description
1 schema 第一个模式字段是事件键的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件键的有效负载部分中的内容。换句话说,第一个模式字段描述了主键的结构,如果表没有主键,则描述唯一键的结构,用于已更改的表。可以通过设置 message.key.columns 连接器配置属性来覆盖表的主键。在这种情况下,第一个模式字段描述了由该属性标识的键的结构。
2 payload 第一个有效负载字段是事件键的一部分。它具有前面架构字段所描述的结构,并且包含已更改行的键。
3 schema 第二个模式字段是事件值的一部分。它指定了描述事件值有效负载部分内容的 Kafka Connect 模式。换句话说,第二个模式描述了被改变的行的结构。通常,此模式包含嵌套模式。
4 payload 第二个有效负载字段是事件值的一部分。它具有前面架构字段所描述的结构,并且包含已更改行的实际数据。



1.更改事件键

更改事件的键包含更改表键的模式和更改行的实际键。在连接器创建事件时,模式及其相应的有效负载都包含已更改表的主键(或唯一键约束)中每一列的字段。

请考虑以下客户表,该表后面是该表的更改事件键的示例。

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

示例更改事件键

捕获对客户表的更改的每个更改事件都具有相同的事件键模式。只要客户表具有前面的定义,每个捕获客户表更改的更改事件都具有以下关键结构,在 JSON 中,如下所示:

{
    "schema": { 
        "type": "struct",
        "fields": [ 
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false, 
        "name": "server1.dbo.customers.Key" 
    },
    "payload": { 
        "id": 1004
    }
}

表 5. 更改事件键的说明

Item Field name Description
1 schema 密钥的模式部分指定了一个 Kafka Connect 模式,该模式描述了密钥的有效负载部分中的内容。
2 fields 指定负载中预期的每个字段,包括每个字段的名称、类型以及是否需要。在此示例中,有一个名为 id 的 int32 类型的必填字段。
3 optional 指示事件键是否必须在其有效负载字段中包含值。在此示例中,密钥的有效负载中的值是必需的。当表没有主键时,键的有效负载字段中的值是可选的。
4 server1.dbo.customers.Key 定义密钥有效负载结构的模式名称。此架构描述了已更改表的主键结构。键架构名称的格式为 connector-name.database-schema-name.table-name.Key。在这个例子中:server1 是生成此事件的连接器的名称。dbo 是已更改表的数据库架构。customers是已更新的表。
5 payload 包含为其生成此更改事件的行的键。在此示例中,键包含一个值为 1004 的 id 字段。

注意:

尽管 column.exclude.list 和 column.include.list 连接器配置属性允许您仅捕获表列的子集,但主键或唯一键中的所有列始终包含在事件键中。

警告:

如果表没有主键或唯一键,则更改事件的键为空。这是有道理的,因为无法唯一标识没有主键或唯一键约束的表中的行。



2.更改事件值

更改事件中的值比键复杂一些。与键一样,值也有模式部分和有效负载部分。架构部分包含描述有效负载部分的信封结构的架构,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都有一个带有信封结构的值负载。

考虑用于显示更改事件键示例的相同示例表:

CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);

针对每个事件类型描述了更改此表的更改事件的值部分。



3.创建事件

以下示例显示了连接器为在客户表中创建数据的操作生成的更改事件的值部分:

{
  "schema": {    (1)
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",  (2)
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "server1.dbo.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source",   (3)
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "server1.dbo.customers.Envelope"     (4)
  },
  "payload": { (5)
    "before": null, (6)
    "after": { (7)
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "source": {   (8)
      "version": "1.9.6.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729468470,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000758:0003",
      "commit_lsn": "00000027:00000758:0005",
      "event_serial_no": "1"
    },
    "op": "c",     (9)
    "ts_ms": 1559729471739                  (10)
  }
}

表 6. 创建事件值字段的描述

Item Field name Description
1 schema 值的架构,它描述了值的有效负载的结构。在连接器为特定表生成的每个更改事件中,更改事件的值架构都是相同的。
2 name 在模式部分中,每个名称字段都指定值有效负载中字段的模式。server1.dbo.customers.Value 是有效负载前后字段的架构。此模式特定于客户表。before 和 after 字段的 schema 名称的格式为logicalName.database-schemaName.tableName.Value,这样可以确保 schema 名称在数据库中是唯一的。这意味着在使用 Avro 转换器时,每个逻辑源中每个表的生成 Avro 模式都有自己的演变和历史。
3 name io.debezium.connector.sqlserver.Source 是有效负载源字段的架构。此架构特定于 SQL Server 连接器。连接器将它用于它生成的所有事件。
4 name server1.dbo.customers.Envelope 是有效负载整体结构的架构,其中 server1 是连接器名称,dbo 是数据库架构名称,customers 是表。
5 payload 该值的实际数据。这是更改事件提供的信息。看起来事件的 JSON 表示比它们描述的行大得多。这是因为 JSON 表示必须包括消息的模式和有效负载部分。但是,通过使用 Avro 转换器,您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。
6 before 一个可选字段,指定事件发生之前行的状态。当 op 字段为 c 表示创建时,如本例所示,before 字段为空,因为此更改事件是针对新内容的。
7 after 一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行的 id、first_name、last_name 和 email 列的值。
8 source 描述事件源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的来源、事件发生的顺序以及事件是否属于同一事务的一部分。源元数据包括:Debezium 版本、连接器类型和名称、数据库和模式名称、在数据库中进行更改时的时间戳、如果事件是快照的一部分、包含新行的表的名称、服务器日志偏移量
9 op 强制字符串,描述导致连接器生成事件的操作类型。在此示例中,c 表示该操作创建了一行。有效值为:c = 创建、u = 更新、d = 删除、r = 读取(仅适用于快照)
10 ts_ms 显示连接器处理事件的时间的可选字段。在事件消息信封中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示在数据库中提交更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的延迟。



4.更新事件

示例客户表中更新的更改事件的值与该表的创建事件具有相同的模式。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。以下是连接器为客户表中的更新生成的事件中的更改事件值示例:

{
  "schema": { ... },
  "payload": {
    "before": { (1)
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "john.doe@example.org"
    },
    "after": { (2)
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "source": { (3)
      "version": "1.9.6.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559729995937,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000ac0:0002",
      "commit_lsn": "00000027:00000ac0:0007",
      "event_serial_no": "2"
    },
    "op": "u", (4)
    "ts_ms": 1559729998706  (5)
  }
}

表 7. 更新事件值字段的描述

Item Field name Description
1 before 一个可选字段,指定事件发生之前行的状态。在更新事件值中,before 字段包含每个表列的字段以及数据库提交之前该列中的值。在此示例中,电子邮件值为 john.doe@example.org。
2 after 一个可选字段,指定事件发生后行的状态。您可以比较之前和之后的结构,以确定该行的更新内容。在示例中,电子邮件值现在是 noreply@example.org。
3 source 描述事件源元数据的必填字段。源字段结构与创建事件中的字段相同,但有些值不同,例如示例更新事件具有不同的偏移量。源元数据包括:Debezium 版本、连接器类型和名称、数据库和模式名称、在数据库中进行更改时的时间戳、如果事件是快照的一部分、包含新行的表的名称、服务器日志偏移量 event_serial_no 字段区分具有相同提交和更改 LSN 的事件。此字段的值不是 1 时的典型情况:update events 的值设置为 2,因为更新会在 SQL Server 的 CDC 更改表中生成两个事件。第一个事件包含旧值,第二个事件包含新值。连接器使用第一个事件中的值来创建第二个事件。连接器丢弃第一个事件。当主键更新时,SQL Server 会发出两个事件。删除具有旧主键值的记录的删除事件和添加具有新主键值的记录的创建事件。两个操作共享相同的提交和更改 LSN,它们的事件编号分别为 1 和 2。
4 op 描述操作类型的强制字符串。在更新事件值中,op 字段值为 u,表示该行因更新而改变。
5 ts_ms 显示连接器处理事件的时间的可选字段。在事件消息信封中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示将更改提交到数据库的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的延迟。

注意:


更新行的主键/唯一键的列会更改行键的值。当一个键改变时,Debezium 会输出三个事件:一个删除事件和一个带有该行旧键的墓碑事件,然后是一个带有该行新键的创建事件。



5.删除事件

删除更改事件中的值与同一表的创建和更新事件具有相同的架构部分。示例客户表的删除事件中的有效负载部分如下所示:

{
  "schema": { ... },
  },
  "payload": {
    "before": { <>
      "id": 1005,
      "first_name": "john",
      "last_name": "doe",
      "email": "noreply@example.org"
    },
    "after": null,      (2)
    "source": {          (3)
      "version": "1.9.6.Final",
      "connector": "sqlserver",
      "name": "server1",
      "ts_ms": 1559730445243,
      "snapshot": false,
      "db": "testDB",
      "schema": "dbo",
      "table": "customers",
      "change_lsn": "00000027:00000db0:0005",
      "commit_lsn": "00000027:00000db0:0007",
      "event_serial_no": "1"
    },
    "op": "d",         (4)
    "ts_ms": 1559730450205     (5)
  }
}

表 8. 删除事件值字段的描述

Item Field name Description
1 before 可选字段,指定事件发生前行的状态。在删除事件值中,before 字段包含在使用数据库提交删除之前该行中的值。
2 after 可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为空,表示该行不再存在。
3 source 描述事件源元数据的必填字段。在删除事件值中,源字段结构与同一表的创建和更新事件相同。许多源字段值也是相同的。在删除事件值中,ts_ms 和 pos 字段值以及其他值可能已更改。但是删除事件值中的源字段提供了相同的元数据:Debezium 版本、连接器类型和名称、数据库和模式名称、在数据库中进行更改时的时间戳、如果事件是快照的一部分、包含新行的表的名称、服务器日志偏移量
4 op 描述操作类型的强制字符串。 op 字段值为 d,表示该行已被删除。
5 ts_ms 显示连接器处理事件的时间的可选字段。在事件消息信封中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 指示在数据库中进行更改的时间。通过将 payload.source.ts_ms 的值与 payload.ts_ms 的值进行比较,您可以确定源数据库更新和 Debezium 之间的延迟。

SQL Server 连接器事件旨在与 Kafka 日志压缩一起使用。只要至少保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这让 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。



6.墓碑事件

当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除所有具有相同键的早期消息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须为空。为了实现这一点,在 Debezium 的 SQL Server 连接器发出删除事件后,连接器会发出一个特殊的墓碑事件,该事件具有相同的键但为空值。



四、事物元数据

Debezium 可以生成表示事务边界和丰富数据更改事件消息的事件。

Debezium 接收事务元数据的时间限制,Debezium 仅注册和接收部署连接器后发生的事务的元数据。部署连接器之前发生的事务的元数据不可用。

数据库事务由包含在 BEGIN 和 END 关键字之间的语句块表示。 Debezium 为每个事务中的 BEGIN 和 END 分隔符生成事务边界事件。事务边界事件包含以下字段:

status

  • BEGIN or END

id:

  • 唯一交易标识符的字符串表示。

event_count(用于 END 事件)

  • 事务发出的事件总数。

data_collections(用于 END 事件)

  • data_collection 和 event_count 对的数组,提供源自给定数据集合的更改发出的事件数。

警告:

Debezium 无法可靠地识别交易何时结束。因此,仅在另一个事务的第一个事件到达后才会发出事务结束标记。在低流量系统的情况下,这可能会导致延迟交付 END 标记。

以下示例显示了典型的事务边界消息:

{
  "status": "BEGIN",
  "id": "00000025:00000d08:0025",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "00000025:00000d08:0025",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "testDB.dbo.tablea",
      "event_count": 1
    },
    {
      "data_collection": "testDB.dbo.tableb",
      "event_count": 1
    }
  ]
}

除非通过 transaction.topic 选项覆盖,否则事务事件将写入名为 database.server.name.transaction 的主题。

Change data event enrichment

启用事务元数据后,数据消息 Envelope 会增加一个新事务字段。此字段以字段组合的形式提供有关每个事件的信息:

ID:

  • 唯一交易标识符的字符串表示

total_order:

  • 该事件在交易产生的所有事件中的绝对位置

data_collection_order

  • 该事件在事务发出的所有事件中的每个数据收集位置

以下示例显示了典型消息的外观:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "00000025:00000d08:0025",
    "total_order": "1",
    "data_collection_order": "1"
  }
}



五、数据类型映射

Debezium SQL Server 连接器通过生成与行所在的表类似的事件来表示对表行数据的更改。每个事件都包含表示行的列值的字段。事件表示操作的列值的方式取决于列的 SQL 数据类型。在这种情况下,连接器将每个 SQL Server 数据类型的字段映射到文字类型和语义类型。

连接器可以将 SQL Server 数据类型映射到文字和语义类型。

文字类型:

  • 描述如何使用 Kafka Connect 模式类型(即 INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP 和 STRUCT)字面表示值。

语义类型

  • 描述 Kafka Connect 架构如何使用字段的 Kafka Connect 架构名称来捕获字段的含义。

如果默认数据类型转换不能满足您的需求,您可以为连接器创建自定义转换器。



1.基本类型

下表显示了连接器如何映射基本 SQL Server 数据类型。

表 9. SQL Server 连接器使用的数据类型映射

SQL Server 数据类型 文字类型(模式类型) 语义类型(模式名称)和注释
BIT BOOLEAN n/a
TINYINT INT16 n/a
SMALLINT INT16 n/a
INT INT32 n/a
BIGINT INT64 n/a
REAL FLOAT32 n/a
FLOAT[(N)] FLOAT64 n/a
CHAR[(N)] STRING n/a
VARCHAR[(N)] STRING n/a
TEXT STRING n/a
NCHAR[(N)] STRING n/a
NVARCHAR[(N)] STRING n/a
NTEXT STRING n/a
XML STRING io.debezium.data.Xml 包含 XML 文档的字符串表示
DATETIMEOFFSET[§] STRING io.debezium.time.ZonedTimestamp带有时区信息的时间戳的字符串表示形式,其中时区为 GMT

以下部分描述了其他数据类型映射。

如果存在,则将列的默认值传播到相应字段的 Kafka Connect 架构。更改消息将包含该字段的默认值(除非已给出明确的列值),因此很少需要从模式中获取默认值。尽管将 Avro 作为序列化格式与 Confluent 模式注册表一起使用时,传递默认值有助于满足兼容性规则。



2.时间值

除了 SQL Server 的 DATETIMEOFFSET 数据类型(包含时区信息)之外,其他时间类型取决于 time.precision.mode 配置属性的值。当 time.precision.mode 配置属性设置为adaptive(默认)时,连接器将根据列的数据类型定义确定时间类型的文字类型和语义类型,以便事件准确地表示数据库中的值:

SQL Server 数据类型 文字类型(模式类型) 语义类型(模式名称)和注释
DATE INT32 io.debezium.time.Date表示自纪元以来的天数。
TIME(0), TIME(1), TIME(2), TIME(3) INT32 io.debezium.time.Time表示午夜过后的毫秒数,不包括时区信息。
TIME(4), TIME(5), TIME(6) INT64 io.debezium.time.MicroTime表示午夜过后的微秒数,不包括时区信息。
TIME(7) INT64 io.debezium.time.NanoTime表示午夜过后的纳秒数,不包括时区信息。
DATETIME INT64 io.debezium.time.Timestamp表示经过纪元的毫秒数,不包括时区信息。
SMALLDATETIME INT64 io.debezium.time.Timestamp表示经过纪元的毫秒数,不包括时区信息。
DATETIME2(0), DATETIME2(1), DATETIME2(2), DATETIME2(3) INT64 io.debezium.time.Timestamp表示经过纪元的毫秒数,不包括时区信息。
DATETIME2(4), DATETIME2(5), DATETIME2(6) INT64 io.debezium.time.MicroTimestamp表示经过纪元的微秒数,不包括时区信息。
DATETIME2(7) INT64 io.debezium.time.NanoTimestamp表示历元过去的纳秒数,不包括时区信息。

当 time.precision.mode 配置属性设置为 connect 时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者只知道内置的 Kafka Connect 逻辑类型并且无法处理可变精度的时间值时,这可能很有用。另一方面,由于 SQL Server 支持十分之一微秒精度,因此当数据库列的小数秒精度值大于 3 时,具有连接时间精度模式的连接器生成的事件将导致精度损失:

SQL Server 数据类型 文字类型(模式类型) 语义类型(模式名称)和注释
DATE INT32 org.apache.kafka.connect.data.Date表示自纪元以来的天数。
TIME([P]) INT64 org.apache.kafka.connect.data.Time表示自午夜以来的毫秒数,不包括时区信息。 SQL Server 允许 P 在 0-7 范围内以存储高达十分之一微秒的精度,尽管当 P > 3 时这种模式会导致精度损失。
DATETIME INT64 org.apache.kafka.connect.data.Timestamp表示自纪元以来的毫秒数,不包括时区信息。
SMALLDATETIME INT64 org.apache.kafka.connect.data.Timestamp表示经过纪元的毫秒数,不包括时区信息。
DATETIME2 INT64 org.apache.kafka.connect.data.Timestamp表示自纪元以来的毫秒数,不包括时区信息。 SQL Server 允许 P 在 0-7 范围内以存储高达十分之一微秒的精度,尽管当 P > 3 时这种模式会导致精度损失。



3.时间戳值

DATETIME、SMALLDATETIME 和 DATETIME2 类型表示没有时区信息的时间戳。此类列将转换为基于 UTC 的等效 Kafka Connect 值。因此,例如 DATETIME2 值“2018-06-20 15:13:16.945104”由值为“1529507596945104”的 io.debezium.time.MicroTimestamp 表示。

请注意,运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响此转换。



4.十进制值

Debezium 连接器根据 decimal.handling.mode 连接器配置属性的设置处理小数。

decimal.handling.mode=precise

表 10. 当 decimal.handling.mode=precise 时的映射

SQL Server 数据类型 文字类型(模式类型) 语义类型(模式名称)和注释
NUMERIC[(P[,S])] BYTES org.apache.kafka.connect.data.Decimalscale 模式参数包含一个整数,表示小数点移动了多少位。
DECIMAL[(P[,S])] BYTES org.apache.kafka.connect.data.Decimalscale 模式参数包含一个整数,表示小数点移动了多少位。
SMALLMONEY BYTES org.apache.kafka.connect.data.Decimal scale 模式参数包含一个整数,表示小数点移动了多少位。
MONEY BYTES org.apache.kafka.connect.data.Decimalscale 模式参数包含一个整数,表示小数点移动了多少位。

decimal.handling.mode=double

表 11. 当 decimal.handling.mode=double 时的映射

SQL Server 数据类型 文字类型(模式类型) 语义类型(模式名称)和注释
NUMERIC[(M[,D])] FLOAT64 n/a
DECIMAL[(M[,D])] FLOAT64 n/a
SMALLMONEY[(M[,D])] FLOAT64 n/a
MONEY[(M[,D])] FLOAT64 n/a

decimal.handling.mode=string

表 12. 当 decimal.handling.mode=string 时的映射

SQL Server 数据类型 文字类型(模式类型) 语义类型(模式名称)和注释
NUMERIC[(M[,D])] STRING n/a
DECIMAL[(M[,D])] STRING n/a
SMALLMONEY[(M[,D])] STRING n/a
MONEY[(M[,D])] STRING n/a



六、设置SQL Server

为了让 Debezium 从 SQL Server 表中捕获更改事件,具有必要权限的 SQL Server 管理员必须首先运行查询以在数据库上启用 CDC。然后管理员必须为您希望 Debezium 捕获的每个表启用 CDC。

应用 CDC 后,它会捕获提交到启用了 CDD 的表的所有 INSERT、UPDATE 和 DELETE 操作。然后 Debezium 连接器可以捕获这些事件并将它们发送到 Kafka 主题。



1.在 SQL Server 数据库上启用 CDC

在为表启用 CDC 之前,您必须为 SQL Server 数据库启用它。 SQL Server 管理员通过运行系统存储过程来启用 CDC。系统存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。

先决条件

  • 您是 SQL Server 的 sysadmin 固定服务器角色的成员。
  • 您是数据库的 db_owner。
  • SQL Server 代理正在运行。

注意:

  • SQL Server CDC 功能仅处理用户创建的表中发生的更改。您不能在 SQL Server 主数据库上启用 CDC。

程序

  • 1.从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。
  • 2.在模板浏览器中,展开 SQL Server 模板。
  • 3.展开更改数据捕获 > 配置,然后单击为 CDC 启用数据库。
  • 4.在模板中,将 USE 语句中的数据库名称替换为要为 CDC 启用的数据库名称。
  • 5.运行存储过程 sys.sp_cdc_enable_db 为 CDC 启用数据库。

为 CDC 启用数据库后,将创建名为 cdc 的模式,以及 CDC 用户、元数据表和其他系统对象。

以下示例显示如何为数据库 MyDB 启用 CDC:

示例:为 CDC 模板启用 SQL Server 数据库

USE MyDB
GO
EXEC sys.sp_cdc_enable_db
GO



2.在 SQL Server 表上启用 CDC

SQL Server 管理员必须在您希望 Debezium 捕获的源表上启用更改数据捕获。数据库必须已为 CDC 启用。要在表上启用 CDC,SQL Server 管理员为该表运行存储过程 sys.sp_cdc_enable_table。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。必须为要捕获的每个表启用 SQL Server CDC。

先决条件:

  • CDC 在 SQL Server 数据库上启用。
  • SQL Server 代理正在运行。
  • 您是数据库的 db_owner 固定数据库角色的成员。

程序

  • 从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。
  • 在模板浏览器中,展开 SQL Server 模板。
  • 展开更改数据捕获 > 配置,然后单击启用表指定文件组选项。
  • 在模板中,将 USE 语句中的表名替换为您要捕获的表名。
  • 运行存储过程 sys.sp_cdc_enable_table。

以下示例显示如何为表 MyTable 启用 CDC:

示例:为 SQL Server 表启用 CDC

USE MyDB
GO

EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name   = N'MyTable', 
@role_name     = N'MyRole',  
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
  • 指定要捕获的表的名称。
  • 指定角色 MyRole,您可以向该角色添加要授予对源表的捕获列的 SELECT 权限的用户。具有 sysadmin 或 db_owner 角色的用户还可以访问指定的更改表。将 @role_name 的值设置为 NULL,以仅允许 sysadmin 或 db_owner 中的成员对捕获的信息具有完全访问权限。
  • 指定 SQL Server 为捕获的表放置更改表的文件组。命名的文件组必须已经存在。最好不要将更改表放在用于源表的同一文件组中。



3.验证用户是否有权访问 CDC 表

SQL Server 管理员可以运行系统存储过程来查询数据库或表以检索其 CDC 配置信息。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。

先决条件

  • 您对捕获实例的所有捕获列具有 SELECT 权限。 db_owner 数据库角色的成员可以查看所有已定义捕获实例的信息。
  • 您拥有为查询包括的表信息定义的任何门控角色的成员资格。

程序

  • 从 SQL Server Management Studio 的“查看”菜单中,单击“对象资源管理器”。
  • 在对象资源管理器中,展开数据库,然后展开您的数据库对象,例如 MyDB。
  • 展开可编程性 > 存储过程 > 系统存储过程。
  • 运行 sys.sp_cdc_help_change_data_capture 存储过程来查询表。

查询不应返回空结果。

以下示例在数据库 MyDB 上运行存储过程 sys.sp_cdc_help_change_data_capture:

示例:查询表以获取 CDC 配置信息

USE MyDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

该查询返回数据库中每个表的配置信息,这些表为 CDC 启用并且包含调用者有权访问的更改数据。如果结果为空,请验证用户是否具有访问捕获实例和 CDC 表的权限。



七、SQL Server 始终开启

SQL Server 连接器可以从 Always On 只读副本捕获更改。

先决条件

  • 在主节点上配置并启用变更数据捕获。 SQL Server 不直接在副本上支持 CDC。

  • 配置选项 database.applicationIntent 设置为 ReadOnly。这是 SQL Server 所要求的。当 Debezium 检测到此配置选项时,它会通过以下操作进行响应:


  • 将snapshot.isolation.mode 设置为snapshot,这是只读副本唯一支持的一种事务隔离模式。


  • 在每次执行流式查询循环时提交(只读)事务,这是获取 CDC 数据最新视图所必需的。



八、SQL Server 捕获作业代理配置对服务器负载和延迟的影响

当数据库管理员为源表启用变更数据捕获时,捕获作业代理开始运行。代理从事务日志中读取新的更改事件记录,并将事件记录复制到更改数据表中。在源表中提交更改与更改出现在相应更改表中的时间之间,始终存在一个小的延迟间隔。此延迟时间间隔表示源表中发生更改与 Debezium 可用于流式传输到 Apache Kafka 之间的差距。

理想情况下,对于必须快速响应数据更改的应用程序,您希望在源表和更改表之间保持密切同步。您可能会想像,运行捕获代理以尽可能快地连续处理变更事件可能会提高吞吐量并减少延迟 — 在事件发生后尽快用新的事件记录填充变更表,几乎是实时的。但是,情况不一定如此。追求更直接的同步需要付出性能代价。每次捕获作业代理向数据库查询新的事件记录时,都会增加数据库主机上的 CPU 负载。服务器上的额外负载可能会对整体数据库性能产生负面影响,并可能降低事务效率,尤其是在数据库使用高峰期。

监视数据库指标很重要,这样您就可以知道数据库是否达到服务器不再支持捕获代理的活动级别的程度。如果您发现性能问题,您可以修改 SQL Server 捕获代理设置,以帮助平衡数据库主机上的总体 CPU 负载和可容忍的延迟程度。



九、SQL Server 捕获作业代理配置参数

在 SQL Server 上,控制捕获作业代理行为的参数在 SQL Server 表 msdb.dbo.cdc_jobs 中定义。如果您在运行捕获作业代理时遇到性能问题,请通过运行 sys.sp_cdc_change_job 存储过程并提供新值来调整捕获作业设置以减少 CPU 负载。

注意:

  • 有关如何配置 SQL Server 捕获作业代理参数的具体指导超出了本文档的范围。

以下参数对于修改用于 Debezium SQL Server 连接器的捕获代理行为最重要:

轮询间隔:

  • 指定捕获代理在日志扫描周期之间等待的秒数。
  • 较高的值会减少数据库主机上的负载并增加延迟。
  • 值 0 指定扫描之间不等待。
  • 默认值为 5。

最大传输:

  • 指定在每个日志扫描周期内处理的最大事务数。在捕获作业处理指定数量的事务后,它会暂停 pollinginterval 指定的时间长度,然后再开始下一次扫描。
  • 较低的值会减少数据库主机上的负载并增加延迟。
  • 默认值为 500。

最大扫描数:

  • 指定捕获作业在捕获数据库事务日志的全部内容时可以尝试的扫描周期数限制。如果连续参数设置为 1,则作业会在 pollinginterval 指定的时间长度内暂停,然后再继续扫描。
  • 较低的值会减少数据库主机上的负载并增加延迟。
  • 默认值为 10。

有关捕获代理参数的详细信息,请参阅 SQL Server 文档。



十、部署

要部署 Debezium SQL Server 连接器,请安装 Debezium SQL Server 连接器存档,配置连接器,然后通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件

  • 已安装 Apache ZooKeeper、Apache Kafka 和 Kafka Connect。
  • SQL Server 已安装,已针对 CDC 进行配置,并且可以与 Debezium 连接器一起使用。

程序

  • 下载 Debezium SQL Server 连接器插件存档
  • 将文件提取到您的 Kafka Connect 环境中。
  • 将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。
  • 配置连接器并将配置添加到您的 Kafka Connect 集群。
  • 重新启动 Kafka Connect 进程以获取新的 JAR 文件。

详细的部署过程记录在下面这篇博文中:



十一、SQL Server 连接器配置示例

以下是连接器实例的配置示例,该实例在 192.168.99.100 的端口 1433 上从 SQL Server 服务器捕获数据,我们在逻辑上将其命名为 fullfillment。通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium SQL Server 连接器。

您可以选择为数据库中的模式和表的子集生成事件。或者,您可以忽略、屏蔽或截断包含敏感数据、大于指定大小或您不需要的列。

{
    "name": "inventory-connector",     (1)
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",     (2)
        "database.hostname": "192.168.99.100",       (3)
        "database.port": "1433",                      (4)
        "database.user": "sa",                        (5)
        "database.password": "Password!",              (6)
        "database.dbname": "testDB",                   (7)
        "database.server.name": "fullfillment",       (8)
        "table.include.list": "dbo.customers",        (9)
        "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
        "database.history.kafka.topic": "dbhistory.fullfillment"    (11)
    }
}
  • 1.当我们向 Kafka Connect 服务注册连接器时的名称。
  • 2.此 SQL Server 连接器类的名称。
  • 3.SQL Server 实例的地址。
  • 4.SQL Server 实例的端口号。
  • 5.SQL Server 用户的名称
  • 6.SQL Server 用户的密码
  • 7.要从中捕获更改的数据库的名称。
  • 8.SQL Server 实例/集群的逻辑名称,形成一个命名空间,用于连接器写入的所有 Kafka 主题名称、Kafka Connect 架构名称以及 Avro 转换器时对应 Avro 架构的命名空间用来。
  • 9.Debezium 应捕获其更改的所有表的列表。
  • 10.此连接器将用于将 DDL 语句写入和恢复到数据库历史主题的 Kafka 代理列表。
  • 11.连接器将写入和恢复 DDL 语句的数据库历史主题的名称。本主题仅供内部使用,消费者不得使用。

有关可以为 Debezium SQL Server 连接器设置的配置属性的完整列表,请参阅 SQL Server 连接器属性。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务记录配置并启动执行以下任务的一个连接器任务:

  • 连接到 SQL Server 数据库。
  • 读取事务日志。
  • 将更改事件记录到 Kafka 主题。


添加连接器配置

要开始运行 Debezium SQL Server 连接器,请创建连接器配置,并将配置添加到您的 Kafka Connect 集群。

先决条件

  • CDC 在 SQL Server 上启用。
  • Debezium SQL Server 连接器已安装。

程序

  • 为 SQL Server 连接器创建配置。
  • 使用 Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 集群。

结果

当连接器启动时,它会为连接器配置的 SQL Server 数据库执行一致的快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。



十二、连接器属性

Debezium SQL Server 连接器具有许多配置属性,您可以使用这些属性为您的应用程序实现正确的连接器行为。许多属性都有默认值。

有关属性的信息组织如下:

  • 必需的连接器配置属性
  • 高级连接器配置属性
  • 控制 Debezium 如何处理从数据库历史主题中读取的事件的数据库历史连接器配置属性。
  • 传递数据库历史记录属性
  • 控制数据库驱动程序行为的传递数据库驱动程序属性。



1.必需的 Debezium SQL Server 连接器配置属性

除非默认值可用,否则需要以下配置属性。

属性 默认值 描述
name No default 连接器的唯一名称。尝试使用相同名称再次注册将失败。 (所有 Kafka Connect 连接器都需要此属性。)
connector.class No default 连接器的 Java 类的名称。始终为 SQL Server 连接器使用 io.debezium.connector.sqlserver.SqlServerConnector 值。
tasks.max 1 应为此连接器创建的最大任务数。 SQL Server 连接器始终使用单个任务,因此不使用此值,因此默认值始终可以接受。
database.hostname No default SQL Server 数据库服务器的 IP 地址或主机名。
database.port 1433 SQL Server 数据库服务器的整数端口号。
database.user No default 连接到 SQL Server 数据库服务器时使用的用户名。使用 Kerberos 身份验证时可以省略,可以使用传递属性进行配置。
database.password No default 连接到 SQL Server 数据库服务器时使用的密码。
database.dbname No default 要从中流式传输更改的 SQL Server 数据库的名称。不得与 database.names 一起使用。
database.instance No default 指定 SQL Server 命名实例的实例名称。
database.names No default 要从中流式传输更改的 SQL Server 数据库名称的逗号分隔列表。目前,仅支持一个数据库名称。不得与 database.dbname 一起使用。此选项是实验性的,不得在生产中使用。使用它会使连接器的行为与没有升级或降级路径的默认配置不兼容:连接器将对其提交的偏移量消息使用不同的键。napshot.select.statement.overrides 中使用的 SQL 语句必须使用数据库名称作为完全限定表名称的一部分。公开的连接器指标的结构会有所不同。
database.server.name No default 为您希望 Debezium 捕获的 SQL Server 数据库服务器标识并提供命名空间的逻辑名称。逻辑名称在所有其他连接器中应该是唯一的,因为它用作从该连接器接收记录的所有 Kafka 主题名称的前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。不要更改此属性的值。如果您更改名称值,在重新启动后,连接器不会继续向原始主题发出事件,而是向名称基于新值的主题发出后续事件。连接器也无法恢复其数据库历史主题。
schema.include.list No default 一个可选的、以逗号分隔的正则表达式列表,与您要捕获更改的模式名称相匹配。未包含在 schema.include.list 中的任何模式名称都将被排除在捕获其更改之外。默认情况下,所有非系统模式都会捕获其更改。不要同时设置 schema.exclude.list 属性。
schema.exclude.list No default 一个可选的、以逗号分隔的正则表达式列表,与您不想捕获更改的模式名称匹配。任何名称未包含在 schema.exclude.list 中的模式都会捕获其更改,但系统模式除外。不要同时设置 schema.include.list 属性。
table.include.list No default 一个可选的以逗号分隔的正则表达式列表,匹配您希望 Debezium 捕获的表的完全限定表标识符;未包含在 table.include.list 中的任何表都将从捕获中排除。每个标识符的格式为 schemaName.tableName。默认情况下,连接器会捕获指定模式的所有非系统表。不得与 table.exclude.list 一起使用。
table.exclude.list No default 一个可选的以逗号分隔的正则表达式列表,匹配您想要从捕获中排除的表的完全限定表标识符; Debezium 捕获未包含在 table.exclude.list 中的所有表。每个标识符的格式为 schemaName.tableName。不得与 table.include.list 一起使用。
column.include.list empty string 一个可选的以逗号分隔的正则表达式列表,匹配应包含在更改事件消息值中的列的完全限定名称。列的完全限定名称的格式为 schemaName.tableName.columnName。请注意,主键列始终包含在事件的键中,即使未包含在值中。不要同时设置 column.exclude.list 属性。
column.exclude.list empty string 一个可选的以逗号分隔的正则表达式列表,匹配应从更改事件消息值中排除的列的完全限定名称。列的完全限定名称的格式为 schemaName.tableName.columnName。请注意,主键列始终包含在事件的键中,即使从值中排除。不要同时设置 column.include.list 属性。
column.mask.hash.hashAlgorithm.with.salt.salt; column.mask.hash.v2.hashAlgorithm.with.salt.salt n/a 一个可选的、以逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。列的完全限定名称的格式为

<schemaName>.<tableName>._<columnName>

。在生成的更改事件记录中,指定列的值将替换为假名。假名由应用指定的 hashAlgorithm 和 salt 产生的散列值组成。基于所使用的散列函数,参照完整性得以保持,而列值被替换为假名。 Java Cryptography Architecture Standard Algorithm Name Documentation 的 MessageDigest 部分描述了支持的散列函数。在以下示例中,CzQMA0cB5K 是随机选择的盐。column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName如有必要,假名会自动缩短为列的长度。连接器配置可以包括指定不同哈希算法和盐的多个属性。根据使用的 hashAlgorithm、选择的 salt 和实际数据集,生成的数据集可能不会被完全屏蔽。如果值在不同的地方或系统中进行散列,则应使用散列策略版本 2 来确保保真度。
time.precision.mode adaptive 时间、日期和时间戳可以用不同类型的精度表示,包括: 自适应(默认)根据数据库列的类型使用毫秒、微秒或纳秒精度值精确捕获数据库中的时间和时间戳值;或 connect 始终使用 Kafka Connect 的内置表示时间、日期和时间戳表示时间和时间戳值,无论数据库列的精度如何,它都使用毫秒精度。
decimal.handling.mode precise 指定连接器应如何处理 DECIMAL 和 NUMERIC 列的值:精确(默认)使用以二进制形式在更改事件中表示的 java.math.BigDecimal 值精确地表示它们。double 使用 double 值表示它们,这可能会导致精度损失,但更易于使用。string 将值编码为格式化的字符串,这很容易使用,但有关真实类型的语义信息会丢失。
include.schema.changes true 布尔值,指定连接器是否应将数据库架构中的更改发布到与数据库服务器 ID 同名的 Kafka 主题。每个架构更改都记录有一个包含数据库名称的键和一个描述架构更新的 JSON 结构的值。这与连接器内部记录数据库历史的方式无关。默认值为真。
tombstones.on.delete true 控制删除事件后是否有墓碑事件。true – 删除操作由删除事件和后续的墓碑事件表示。false – 仅发出删除事件。删除源记录后,发出 tombstone 事件(默认行为)允许 Kafka 完全删除与已删除行的键相关的所有事件,以防主题启用日志压缩。
column.truncate.to.

length

.chars
n/a 一个可选的以逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称,如果字段值长于指定的字符数,则其值应在更改事件消息值中被截断。可以在单个配置中使用具有不同长度的多个属性,尽管在每个配置中长度必须是正整数。列的完全限定名称的格式为 schemaName.tableName.columnName。
column.mask.with.

length

.chars
n/a 一个可选的以逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称,其值应在更改事件消息值中替换为由指定数量的星号 (*) 字符组成的字段值。可以在单个配置中使用具有不同长度的多个属性,尽管在每个配置中长度必须是正整数或零。列的完全限定名称的格式为 schemaName.tableName.columnName。
column.propagate.source.type n/a 一个可选的以逗号分隔的正则表达式列表,匹配列的完全限定名称,其原始类型和长度应作为参数添加到发出的更改消息中的相应字段模式中。架构参数 __debezium.source.column.type、__debezium.source.column.length 和 __debezium.source.column.scale 分别用于传播原始类型名称和长度(对于可变宽度类型)。有助于在接收器数据库中正确调整相应列的大小。列的完全限定名称的格式为 schemaName.tableName.columnName。
datatype.propagate.source.type n/a 一个可选的以逗号分隔的正则表达式列表,匹配列的数据库特定数据类型名称,其原始类型和长度应作为参数添加到发出的更改消息中的相应字段模式中。架构参数 __debezium.source.column.type、__debezium.source.column.length 和 __debezium.source.column.scale 将分别用于传播原始类型名称和长度(对于可变宽度类型)。有助于在接收器数据库中正确调整相应列的大小。完全限定的数据类型名称的格式为 schemaName.tableName.typeName。有关 SQL Server 特定数据类型名称的列表,请参阅 SQL Server 数据类型。
message.key.columns n/a 一个表达式列表,指定连接器用于形成自定义消息键的列,以形成它发布到指定表的 Kafka 主题的更改事件记录。默认情况下,Debezium 使用表的主键列作为它发出的记录的消息键。代替默认值,或为缺少主键的表指定键,您可以基于一个或多个列配置自定义消息键。要为表建立自定义消息键,请列出表,然后列出用作消息键的列。每个列表条目采用以下格式:<fully-qualified_tableName>:

<keyColumn>

,<keyColumn>要将表键基于多个列名,请在列名之间插入逗号。每个完全限定的表名都是以下格式的正则表达式:<schemaName>.<tableName>该属性可以包含多个表的条目。使用分号分隔列表中的表条目。以下示例为表 inventory.customers 和 purchase.orders 设置消息键:库存.客户:pk1,pk2;(.*).purchaseorders:pk3,pk4对于表inventory.customer,列pk1 和pk2 被指定为消息键。对于任何模式中的 purchaseorders 表,pk3 和 pk4 列作为消息键。用于创建自定义消息键的列数没有限制。但是,最好使用指定唯一键所需的最小数量。
binary.handling.mode bytes 指定二进制(binary、varbinary)列在更改事件中应该如何表示,包括:bytes 表示二进制数据为字节数组(默认),base64 表示二进制数据为 base64-encoded String,hex 表示二进制数据为 hex-encoded (base16)string
schema.name.adjustment.mode avro 指定应如何调整架构名称以与连接器使用的消息转换器兼容。可能的设置:avro 将不能在 Avro 类型名称中使用的字符替换为下划线。none 不应用任何调整。



2.高级 SQL Server 连接器配置属性

以下高级配置属性具有良好的默认值,适用于大多数情况,因此很少需要在连接器的配置中指定。

属性 默认值 描述
converters No default 枚举连接器可以使用的自定义转换器实例的符号名称的逗号分隔列表。
snapshot.mode initial 一种用于获取结构的初始快照以及捕获的表的可选数据的模式。快照完成后,连接器将继续从数据库的重做日志中读取更改事件。支持以下值:initial:对捕获的表的结构和数据进行快照;如果主题应该填充来自捕获表的数据的完整表示,则很有用。initial_only:像initial一样拍摄结构和数据的快照,但是一旦快照完成就不会转换为流式更改。schema_only:仅对捕获的表的结构进行快照;如果只有从现在开始发生的更改应该传播到主题,则很有用。
snapshot.include.collection.list All tables specified in table.include.list 一个可选的、以逗号分隔的正则表达式列表,与要包含在快照中的表的完全限定名称 (..) 匹配。指定的项目必须在连接器的 table.include.list 属性中命名。仅当连接器的 snapshot.mode 属性设置为 never 以外的值时,此属性才会生效。此属性不影响增量快照的行为。
snapshot.isolation.mode repeatable_read 用于控制使用哪个事务隔离级别以及连接器锁定指定用于捕获的表的时间长度的模式。支持以下值:读未提交、已提交读、可重复读取、快照、独占(独占模式使用可重复的读取隔离级别,但是,它需要对所有要读取的表进行独占锁定)。快照、read_committed 和 read_uncommitted 模式不会阻止其他事务在初始快照期间更新表行。 Exclusive 和 repeatable_read 模式确实可以防止并发更新。模式选择也会影响数据的一致性。只有独占和快照模式才能保证完全一致,即初始快照和流式日志构成线性历史。在 repeatable_read 和 read_committed 模式的情况下,例如,添加的记录可能会出现两次 – 一次在初始快照中,一次在流阶段。尽管如此,该一致性级别应该适用于数据镜像。对于 read_uncommitted,根本没有数据一致性保证(某些数据可能会丢失或损坏)。
event.processing.failure.handling.mode fail 指定连接器在处理事件期间应如何对异常做出反应。 fail 将传播异常(指示有问题的事件的偏移量),导致连接器停止。warn 将导致有问题的事件被跳过并记录有问题的事件的偏移量。skip 将导致有问题的事件被跳过。
poll.interval.ms 1000 正整数值,指定连接器在每次迭代期间应等待新更改事件出现的毫秒数。默认为 1000 毫秒或 1 秒。
max.queue.size 8192 正整数值,指定阻塞队列可以保存的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列中,然后再将它们写入 Kafka。在连接器接收消息的速度快于将消息写入 Kafka 的速度或 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供背压。当连接器定期记录偏移量时,将忽略队列中保存的事件。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。
max.queue.size.in.bytes 0 一个长整数值,指定阻塞队列的最大容量(以字节为单位)。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。如果还设置了 max.queue.size,则当队列的大小达到任一属性指定的限制时,将阻止写入队列。例如,如果设置 max.queue.size=1000,max.queue.size.in.bytes=5000,则在队列包含 1000 条记录后,或队列中的记录量大后,写入队列被阻塞达到 5000 字节。
max.batch.size 2048 正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。
heartbeat.interval.ms 0 控制发送心跳消息的频率。此属性包含以毫秒为单位的时间间隔,用于定义连接器向心跳主题发送消息的频率。该属性可用于确认连接器是否仍在接收来自数据库的更改事件。如果只有未捕获表中的记录在较长时间内更改,您还应该利用心跳消息。在这种情况下,连接器将继续从数据库中读取日志,但不会将任何更改消息发送到 Kafka,这反过来意味着没有偏移量更新提交给 Kafka。这可能会导致在连接器重新启动后重新发送更多更改事件。将此参数设置为 0 以完全不发送心跳消息。默认禁用。
heartbeat.topics.prefix __debezium-heartbeat 控制向其发送心跳消息的主题的命名。主题根据模式 <heartbeat.topics.prefix>.<server.name> 命名。
snapshot.delay.ms No default 连接器在启动后拍摄快照之前应等待的毫秒间隔;可用于在集群中启动多个连接器时避免快照中断,这可能导致连接器重新平衡。
snapshot.fetch.size 2000 指定在拍摄快照时应一次性从每个表中读取的最大行数。连接器将分批读取该大小的表格内容。默认为 2000。
query.fetch.size No default 指定将为给定查询的每个数据库往返获取的行数。默认为 JDBC 驱动程序的默认提取大小。
snapshot.lock.timeout.ms 10000 一个整数值,指定执行快照时等待获取表锁的最长时间(以毫秒为单位)。如果在此时间间隔内无法获取表锁,则快照将失败(另请参阅快照)。当设置为 0 时,连接器将在无法获得锁时立即失败。值 -1 表示无限等待。
snapshot.select.statement.overrides No default 指定要包含在快照中的表行。如果您希望快照仅包含表中行的子集,请使用该属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。该属性包含格式为 . 的完全限定表名称的逗号分隔列表。例如,“snapshot.select.statement.overrides”: “inventory.products,customers.orders”对于列表中的每个表,添加一个进一步的配置属性,该属性指定连接器在拍摄快照时在表上运行的 SELECT 语句。指定的 SELECT 语句确定要包含在快照中的表行的子集。使用以下格式指定此 SELECT 语句属性的名称:snapshot.select.statement.overrides..。例如,snapshot.select.statement.overrides.customers.orders。
source.struct.version v2 CDC 事件中源块的模式版本; Debezium 0.10 引入了一些突破更改源块的结构,以统一所有连接器的暴露结构。通过将此选项设置为 v1,可以生成早期版本中使用的结构。请注意,不建议使用此设置,并计划在未来的 Debezium 版本中删除此设置。
sanitize.field.names 当连接器配置明确指定 key.converter 或 value.converter 参数以使用 Avro 时为 true,否则默认为 false。 字段名称是否经过清理以符合 Avro 命名要求。有关详细信息,请参阅 Avro 命名。
provide.transaction.metadata false 当设置为 true 时,Debezium 生成带有事务边界的事件,并使用事务元数据丰富数据事件信封。
transaction.topic ${database.server.name}.transaction 控制连接器向其发送事务元数据消息的主题的名称。占位符 ${database.server.name} 可用于引用连接器的逻辑名称;默认为 ${database.server.name}.transaction,例如 dbserver1.transaction。
retriable.restart.connector.wait.ms 10000(10seconds) 发生可重试错误后重新启动连接器之前等待的毫秒数。
skipped.operations No default 流式传输期间将跳过的操作类型的逗号分隔列表。操作包括:c 用于插入/创建,u 用于更新,d 用于删除。默认情况下,不会跳过任何操作。
signal.data.collection No default value 用于向连接器发送信号的数据集合的完全限定名称。使用以下格式指定集合名称:<databaseName>.<schemaName>.<tableName>
incremental.snapshot.allow.schema.changes false 在增量快照期间允许架构更改。启用后,连接器将在增量快照期间检测架构更改并重新选择当前块以避免锁定 DDL。请注意,不支持对主键的更改,如果在增量快照期间执行,可能会导致不正确的结果。另一个限制是,如果架构更改仅影响列的默认值,则在从 binlog 流处理 DDL 之前不会检测到更改。这不会影响快照事件的值,但快照事件的架构可能具有过时的默认值。
incremental.snapshot.chunk.size 1024 连接器在增量快照块期间获取并读入内存的最大行数。增加块大小提供了更高的效率,因为快照运行的快照查询更少,但更大的大小。然而,更大的块大小也需要更多的内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。
max.iteration.transactions 0 指定每次迭代的最大事务数,用于在从数据库中的多个表流式传输更改时减少内存占用。当设置为 0(默认值)时,连接器使用当前最大 LSN 作为获取更改的范围。当设置为大于零的值时,连接器使用此设置指定的第 n 个 LSN 作为从中获取更改的范围。
incremental.snapshot.option.recompile false 对增量快照期间使用的所有 SELECT 语句使用 OPTION(RECOMPILE) 查询选项。这有助于解决能发生的参数嗅探问题,但可能会导致源数据库上的 CPU 负载增加,具体取决于查询执行的频率。



3.Debezium SQL Server 连接器数据库历史配置属性

Debezium 提供了一组 database.history.* 属性来控制连接器如何与模式历史主题交互。

下表描述了用于配置 Debezium 连接器的 database.history 属性。

属性 默认值 描述
database.history.kafka.topic 连接器存储数据库架构历史的 Kafka 主题的全名。
database.history.kafka.bootstrap.servers 连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索连接器先前存储的数据库模式历史记录,并用于编写从源数据库读取的每个 DDL 语句。每对都应该指向 Kafka Connect 进程使用的同一个 Kafka 集群。
database.history.kafka.recovery.poll.interval.ms 100 一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认值为 100 毫秒。
database.history.kafka.query.timeout.ms 3000 一个整数值,指定连接器在使用 Kafka 管理客户端获取集群信息时应等待的最大毫秒数。
database.history.kafka.recovery.attempts 4 在连接器恢复失败并出现错误之前连接器应尝试读取持久历史数据的最大次数。没有收到数据后等待的最长时间为 recovery.attempts x recovery.poll.interval.ms。
database.history.skip.unparseable.ddl false 一个布尔值,指定连接器是否应忽略格式错误或未知的数据库语句或停止处理,以便人们可以解决问题。安全默认值为 false。跳过应该小心使用,因为它会在处理 binlog 时导致数据丢失或损坏。
database.history.store.only.captured.tables.ddl false 一个布尔值,指定连接器是否应记录所有 DDL 语句true 仅记录那些与 Debezium 正在捕获其更改的表相关的 DDL 语句。设置为 true 时要小心,因为如果您更改捕获其更改的表,则可能需要丢失数据。安全默认值为 false。
database.history.store.only.captured.tables.ddl false 一个布尔值,指定连接器是否应记录所有 DDL 语句true 仅记录那些与 Debezium 正在捕获其更改的表相关的 DDL 语句。设置为 true 时要小心,因为如果您更改捕获其更改的表,则可能需要丢失数据。安全默认值为 false。



4.用于配置生产者和消费者客户端的直通数据库历史记录属性

Debezium 依赖 Kafka 生产者将模式更改写入数据库历史主题。同样,当连接器启动时,它依赖于 Kafka 消费者从数据库历史主题中读取。您可以通过将值分配给以 database.history.producer.* 和 database.history.consumer.* 前缀开头的一组传递配置属性来定义 Kafka 生产者和消费者客户端的配置。传递的生产者和消费者数据库历史属性控制一系列行为,例如这些客户端如何保护与 Kafka 代理的连接,如以下示例所示:

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234

database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

Debezium 在将属性传递给 Kafka 客户端之前从属性名称中去除前缀​​。

有关 Kafka 生产者配置属性和 Kafka 消费者配置属性的更多详细信息,请参阅 Kafka 文档。



5.Debezium SQL Server 连接器直通数据库驱动程序配置属性

Debezium 连接器提供数据库驱动程序的直通配置。直通数据库属性以前缀 database.* 开头。例如,连接器将诸如 database.foobar=false 之类的属性传递给 JDBC URL。

与数据库历史客户端的传递属性一样,Debezium 在将它们传递给数据库驱动程序之前从属性中去除前缀​​。



十三、数据库模式演变

当为 SQL Server 表启用更改数据捕获时,当表中发生更改时,事件记录将持久保存到服务器上的捕获表中。如果您在源表更改的结构中引入更改,例如,通过添加新列,则该更改不会动态反映在更改表中。只要捕获表继续使用过时的模式,Debezium 连接器就无法正确地为表发出数据更改事件。您必须进行干预以刷新捕获表,以使连接器能够恢复处理更改事件。

由于 CDC 在 SQL Server 中的实现方式,您不能使用 Debezium 更新捕获表。要刷新捕获表,必须是具有提升权限的 SQL Server 数据库操作员。作为 Debezium 用户,您必须与 SQL Server 数据库操作员协调任务,以完成架构刷新并将流恢复到 Kafka 主题。

您可以使用以下方法之一在架构更改后更新捕获表:

  • 离线模式更新要求您在更新捕获表之前停止 Debezium 连接器。
  • 在线模式更新可以在 Debezium 连接器运行时更​​新捕获表。

使用每种类型的程序都有优点和缺点。

警告:

无论您使用在线更新方法还是离线更新方法,都必须先完成整个架构更新过程,然后才能对同一个源表应用后续架构更新。最佳实践是在一个批次中执行所有 DDL,这样该过程只能运行一次。

注意:

无论您使用在线更新方法还是离线更新方法,都必须先完成整个架构更新过程,然后才能对同一个源表应用后续架构更新。最佳实践是在一个批次中执行所有 DDL,这样该过程只能运行一次。

将源表中的列从 NULL 更改为 NOT NULL 或反之亦然后,SQL Server 连接器无法正确捕获更改的信息,直到您创建新的捕获实例之后。如果您在更改列名称后没有创建新的捕获表,则连接器发出的更改事件记录不会正确指示该列是否是可选的。也就是说,以前定义为可选(或 NULL)的列继续存在,尽管现在被定义为 NOT NULL。同样,已定义为必需的列 (NOT NULL) 保留该名称,尽管它们现在定义为 NULL。



1.离线架构更新

离线模式更新为更新捕获表提供了最安全的方法。但是,离线更新可能不适用于需要高可用性的应用程序。

先决条件

  • 已将更新提交到启用了 CDC 的 SQL Server 表的架构。
  • 您是具有提升权限的 SQL Server 数据库操作员。

程序

  • 挂起更新数据库的应用程序。
  • 等待 Debezium 连接器流式传输所有未流式传输的更改事件记录。
  • 停止 Debezium 连接器。
  • 将所有更改应用于源表架构。
  • 使用带有参数@capture_instance 的唯一值的sys.sp_cdc_enable_table 过程为更新源表创建一个新的捕获表。
  • 恢复您在步骤 1 中暂停的应用程序。
  • 启动 Debezium 连接器。
  • 在 Debezium 连接器开始从新的捕获表进行流式传输后,通过运行存储过程 sys.sp_cdc_disable_table 并将参数 @capture_instance 设置为旧的捕获实例名称来删除旧的捕获表。



2.在线模式更新

完成在线模式更新的过程比运行离线模式更新的过程更简单,您可以在不需要任何应用程序和数据处理停机时间的情况下完成它。但是,对于在线模式更新,在您更新源数据库中的模式之后,但在您创建新的捕获实例之前,可能会出现潜在的处理差距。在该间隔期间,更改表的旧实例继续捕获更改事件,并且保存到旧表的更改数据保留了早期模式的结构。因此,例如,如果您向源表中添加了新列,则在新捕获表准备好之前生成的更改事件不包含新列的字段。如果您的应用程序不能容忍这样的过渡期,最好使用离线模式更新过程。

先决条件

  • 已将更新提交到启用了 CDC 的 SQL Server 表的架构。
  • 您是具有提升权限的 SQL Server 数据库操作员。

程序

  • 将所有更改应用于源表架构。
  • 通过使用参数@capture_instance 的唯一值运行sys.sp_cdc_enable_table 存储过程,为更新源表创建一个新的捕获表。
  • 当 Debezium 从新的捕获表开始流式传输时,您可以通过运行 sys.sp_cdc_disable_table 存储过程并将参数 @capture_instance 设置为旧的捕获实例名称来删除旧的捕获表。

示例:在数据库架构更改后运行在线架构更新

让我们部署基于 SQL Server 的 Debezium 教程来演示在线模式更新。

在以下示例中,将 phone_number 列添加到客户表中。

键入以下命令以启动数据库 shell:

docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'

通过运行以下查询以添加 phone_number 字段来修改客户源表的架构:

ALTER TABLE customers ADD phone_number VARCHAR(32);

通过运行 sys.sp_cdc_enable_table 存储过程创建新的捕获实例。

EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2';
GO

通过运行以下查询将新数据插入到客户表中:

INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456');
GO

Kafka Connect 日志通过类似于以下消息的条目报告配置更新:

connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
connect_1    | 2019-01-17 10:11:14,924 INFO   ||  Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
...
connect_1    | 2019-01-17 10:11:33,719 INFO   ||  Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL]   [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]

最终,phone_number 字段被添加到模式中,并且它的值出现在写入 Kafka 主题的消息中。

...
     {
        "type": "string",
        "optional": true,
        "field": "phone_number"
     }
...
    "after": {
      "id": 1005,
      "first_name": "John",
      "last_name": "Doe",
      "email": "john.doe@example.com",
      "phone_number": "+1-555-123456"
    },

通过运行 sys.sp_cdc_disable_table 存储过程删除旧的捕获实例。

EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers';
GO



十四、监控

除了 Zookeeper、Kafka 和 Kafka Connect 提供的对 JMX 指标的内置支持之外,Debezium SQL Server 连接器还提供三种类型的指标。连接器提供以下指标:

用于在执行快照时监控连接器的快照指标。

读取 CDC 表数据时用于监控连接器的流式指标。

用于监控连接器架构历史状态的架构历史指标。

有关如何通过 JMX 公开上述指标的信息,请参阅 Debezium 监控文档。



1.Snapshot metrics快照指标

除非快照操作处于活动状态,或者自上次连接器启动以来发生了快照,否则不会公开快照指标。

  • MBean is debezium.sql_server:type=connector-metrics,server=<sqlserver.server.name>,task=<task.id>,context=snapshot.
属性 类型 描述
LastEvent(最后事件) string 连接器读取的最后一个快照事件。
MilliSecondsSinceLastEvent(自上次事件以来的毫秒数) long 自连接器读取并处理最新事件以来的毫秒数。
TotalNumberOfEventsSeen(事件总数) long 此连接器自上次启动或重置以来看到的事件总数。
NumberOfEventsFiltered(过滤的事件数) long 已被连接器上配置的包含/排除列表过滤规则过滤的事件数。
CapturedTables(捕获的表) String[] 连接器捕获的表列表。
QueueTotalCapacity(队列总容量) int 用于在快照程序和 Kafka Connect 主循环之间传递事件的队列长度。
QueueRemainingCapacity(队列剩余容量) int 用于在快照程序和 Kafka Connect 主循环之间传递事件的队列的可用容量
TotalTableCount(总表数) int 快照中包含的表总数。
RemainingTableCount(剩余表数) int 快照尚未复制的表数。
SnapshotRunning(快照运行) boolean 快照是否已启动。
SnapshotAborted(快照中止) boolean 快照是否中止。
SnapshotCompleted(快照完成) boolean 快照是否完成。
SnapshotDurationInSeconds(快照持续时间(以秒为单位)) long 到目前为止快照所用的总秒数,即使未完成也是如此。
RowsScanned(行扫描) Map<String, Long> 快照中包含每个表扫描的行数的映射。在处理过程中,表会逐渐添加到 Map 中。每扫描 10,000 行和完成表格时更新。
MaxQueueSizeInBytes(最大队列大小字节) long 队列的最大缓冲区(以字节为单位)。如果 max.queue.size.in.bytes 以正长值传递,它将被启用。
CurrentQueueSizeInBytes(当前队列大小字节) long 队列中记录的当前数据(以字节为单位)。

执行增量快照时,连接器还提供以下附加快照指标:

属性 类型 描述
ChunkId String 当前快照块的标识符。
ChunkFrom string 定义当前块的主键集的下限。
ChunkTo string 定义当前块的主键集的上限。
TableFrom string 当前快照表的主键集的下限。
TableTo string 当前快照表的主键集的上限。



2.Streaming metrics流指标

MBean is debezium.sql_server:type=connector-metrics,server=<sqlserver.server.name>,task=<task.id>,context=streaming.

属性 类型 描述
LastEvent(最后事件) string 连接器读取的最后一个流事件。
MilliSecondsSinceLastEvent(自上次事件以来的毫秒数) long 自连接器读取并处理最新事件以来的毫秒数。
TotalNumberOfEventsSeen(事件总数) long 此连接器自上次启动或重置以来看到的事件总数。
NumberOfEventsFiltered(过滤的事件数) long 已被连接器上配置的包含/排除列表过滤规则过滤的事件数。
CapturedTables(捕获的表) String[] 连接器捕获的表列表。
QueueTotalCapacity(队列总容量) int 用于在快照程序和 Kafka Connect 主循环之间传递事件的队列长度。
QueueRemainingCapacity(队列剩余容量) int 用于在快照程序和 Kafka Connect 主循环之间传递事件的队列的可用容量
Connected boolean 表示连接器当前是否连接到数据库服务器的标志。
MilliSecondsBehindSource(落后事件源的毫秒数) long 最后更改事件的时间戳和连接器处理该事件之间的毫秒数。 这些值将包含运行数据库服务器和连接器的机器上的时钟之间的任何差异。
NumberOfCommittedTransactions(已提交事务数) long 已提交的已处理事务数。
SourceEventPosition(源事件位置) Map<String, String> 最后接收到的事件的坐标。
LastTransactionId(最后事务编号) string 最后处理的事务的事务标识符。
MaxQueueSizeInBytes(最大队列大小字节) long 队列的最大缓冲区(以字节为单位)。如果 max.queue.size.in.bytes 以正长值传递,它将被启用。
CurrentQueueSizeInBytes(当前队列大小字节) long 队列中记录的当前数据(以字节为单位)。

Debezium SQL Server 连接器还提供以下额外的流指标:

属性 类型 描述
BinlogFilename(二进制日志文件名) string 连接器最近读取的二进制日志文件的名称。
BinlogPosition(二进制日志位置) long 连接器读取的二进制日志中的最新位置(以字节为单位)。
IsGtidModeEnabled(IsGtidMode 已启用) boolean 表示连接器当前是否正在跟踪 MySQL 服务器的 GTID 的标志。
GtidSet string 连接器在读取 binlog 时处理的最新 GTID 集的字符串表示形式。
NumberOfSkippedEvents(跳过的事件数) long MySQL 连接器已跳过的事件数。 通常,由于 MySQL 二进制日志中的错误格式或无法解析的事件,会跳过事件。
NumberOfDisconnects(断开连接数) long MySQL 连接器断开连接的次数
NumberOfRolledBackTransactions(回滚事务数) long 已回滚且未流式传输的已处理事务数。
NumberOfNotWellFormedTransactions(格式不正确的事务数) long 不符合预期的 BEGIN + COMMIT/ROLLBACK 协议的事务数。 在正常情况下,该值应为 0。
NumberOfLargeTransactions(大的事物数) long 未装入前瞻缓冲区的事务数。 为获得最佳性能,该值应明显小于 NumberOfCommittedTransactions 和 NumberOfRolledBackTransactions。



3.Schema history metrics架构历史指标

MBean is debezium.sql_server:type=connector-metrics,context=schema-history,server=<sqlserver.server.name>.

属性 类型 描述
status string STOPPED、RECOVERING(从存储中恢复历史)、RUNNING 之一,描述数据库历史的状态。
RecoveryStartTime(恢复开始时间) long 恢复开始的时间(以秒为单位)。
ChangesRecovered(恢复的变化) long 在恢复阶段读取的更改数。
ChangesApplied(已应用更改) long 在恢复和运行期间应用的架构更改总数。
MilliSecondsSinceLast​RecoveredChange(自上次恢复更改后的毫秒数) long 从历史存储中恢复自上次更改以来经过的毫秒数。
MilliSecondsSinceLast​AppliedChange long 自上次应用更改后经过的毫秒数。
LastRecoveredChange(最后恢复的更改) string 从历史存储中恢复的最后更改的字符串表示形式。
LastAppliedChange string 上次应用更改的字符串表示形式。



十五、Debezium应用延伸与扩展

Debezium系列之第100篇文章:阶段性详细总结对Debezium使用方式的优化,详细介绍对Debezium集群和Kafka集群做的一系列优化:

Debezium系列之:安装部署debezium详细步骤,并把debezium服务托管到systemctl:

Debezium系列之:多个数据库多张表或单个数据库多张表数据导入同一个kafka topic,新增字段区别数据来自哪个数据库哪张表:

更多Debezium系列文章,请参考博主Debezium分享专栏:



版权声明:本文为zhengzaifeidelushang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。