一切始于一个典型的性能瓶颈。我们的核心业务系统,一个在 AWS EC2 上运行了多年的 SQL Server 数据库,承载着一个庞大的、高度规范化的订单模型。Orders
, OrderItems
, Customers
, Addresses
等十几个表通过外键紧密耦合。对于事务性操作(OLTP),它表现得无可挑剔。但随着业务发展,新的分析和查询服务需要以一种“文档”视角来审视订单——即一个包含所有相关信息的聚合视图。在 SQL Server 上执行这些深度 JOIN 和聚合查询,即使有索引优化,也开始拖慢整个系统,甚至在高并发下导致锁争用。
最初的构想是双写,即应用层在写入 SQL Server 的同时,也构造文档写入 MongoDB(我们选型的文档数据库)。这个方案在讨论阶段就被否决了。在真实项目中,双写引入的复杂性和数据不一致风险是不可接受的。任何一侧写入失败都需要复杂的补偿逻辑,这会严重污染业务代码。
下一个方案是批处理ETL,每小时从 SQL Server 抽取数据,转换后加载到 MongoDB。延迟太高,无法满足新业务近实时的要求。我们需要的是一个响应式、低延迟且对源数据库侵入性极小的数据管道。最终,我们将目光锁定在了变更数据捕获(Change Data Capture, CDC)。
技术选型与权衡
数据捕获:为什么是 SQL Server CDC?
SQL Server 从 2008 版开始就内置了 CDC 功能。它通过读取事务日志来捕获 DML (INSERT, UPDATE, DELETE) 变更,并将这些变更写入系统表中。相比于基于触发器的方案,CDC 是异步的,对源数据库的性能影响极小,这是生产环境中的关键考量。它能提供操作类型、变更前后的数据镜像,为下游消费提供了完整的上下文。消息队列:为什么是 Apache Pulsar?
Kafka 是一个常见的选项,但我们选择了 Pulsar。关键原因有二:- 分层存储 (Tiered Storage): Pulsar 的架构将计算 (Broker) 与存储 (BookKeeper) 分离。更重要的是,它原生支持将旧数据透明地卸载到 S3 等廉价对象存储上。这意味着我们可以无限期保留消息日志,用于数据重放、审计或错误恢复,而无需担心昂贵的磁盘成本。这对于一个关键数据管道来说,是极具吸引力的韧性保障。
- Pulsar IO Connectors: Pulsar 拥有一个类似 Kafka Connect 的生态。我们可以直接使用 Debezium SQL Server CDC Source Connector 来读取 SQL Server 的变更,而无需编写一行连接数据库的代码。这极大地加速了开发进程。
部署环境:AWS
整个基础设施都构建在 AWS 之上。SQL Server 运行在 EC2 上(未来可能迁移到 RDS)。Pulsar 集群也自建在 EC2 上,利用 S3 进行分层存储。目标数据库是 AWS DocumentDB(与 MongoDB API 兼容)。消费者服务则部署在 EKS (Elastic Kubernetes Service) 上,以获得更好的弹性和可观测性。
架构概览
整个数据流的设计非常直接,核心在于解耦和异步处理。
graph TD subgraph AWS VPC subgraph Source SQL_Server[SQL Server on EC2] --"Transaction Log"--> CDC_Process(Internal CDC Process) CDC_Process --"Writes to"--> CDC_Tables(CDC System Tables) end subgraph Data Pipeline Pulsar_Connector[Pulsar Debezium Connector on EC2] --"Polls"--> CDC_Tables Pulsar_Connector --"Publishes"--> Pulsar_Cluster[Pulsar Cluster on EC2] Pulsar_Cluster --"Stores data in" --> BookKeeper(BookKeeper) Pulsar_Cluster --"Offloads to" --> S3(S3 Tiered Storage) end subgraph Consumer Consumer_Service[Consumer Service on EKS] --"Subscribes to"--> Pulsar_Cluster Consumer_Service --"Transforms & Writes"--> DocumentDB(AWS DocumentDB) end end SQL_Server --> Pulsar_Connector Pulsar_Connector --> Pulsar_Cluster Pulsar_Cluster --> Consumer_Service Consumer_Service --> DocumentDB
步骤一:在 SQL Server 中启用 CDC
这是整个流程的起点,而且完全通过 T-SQL 完成。假设我们关心 dbo.Orders
和 dbo.OrderItems
这两张表。
-- 确保 SQL Server Agent 正在运行,CDC 作业依赖它
-- 1. 在数据库级别启用 CDC
USE MyBusinessDB;
GO
EXEC sys.sp_cdc_enable_db;
GO
-- 2. 为特定表启用 CDC
-- 为 Orders 表启用
-- role_name = NULL 表示只有 db_owner 可以访问变更数据
-- supports_net_changes = 1 会额外创建一张表来聚合净变更,但会增加开销,我们这里不需要
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Orders',
@role_name = NULL,
@supports_net_changes = 0;
GO
-- 为 OrderItems 表启用
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'OrderItems',
@role_name = NULL,
@supports_net_changes = 0;
GO
-- 验证 CDC 是否已启用
-- 你应该能看到为这两张表创建的 cdc.dbo_Orders_CT 和 cdc.dbo_OrderItems_CT 等变更表
SELECT name, is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1;
这里的坑在于:权限。执行这些命令的用户需要 db_owner
角色。另外,必须确保 SQL Server Agent 服务正在运行,否则 CDC 的捕获和清理作业将不会执行,导致事务日志膨胀。
步骤二:配置 Pulsar Debezium CDC 连接器
我们单独部署了一台 EC2 作为 Pulsar Connector 的运行环境。连接器的配置是核心,它决定了如何连接数据库、捕获哪些表以及消息的格式。
这是一个生产级的配置文件 sqlserver-source-config.yaml
:
# Pulsar IO Source Connector Configuration
# Type: debezium-sqlserver
# See Pulsar/Debezium documentation for all options
tenant: "public"
namespace: "default"
name: "sqlserver-cdc-source"
topicName: "persistent://public/default/sqlserver-cdc-orders"
processingGuarantees: "ATLEAST_ONCE"
configs:
# --- Debezium SQL Server Connector Properties ---
database.hostname: "your-sql-server-instance.region.compute.amazonaws.com"
database.port: "1433"
database.user: "pulsar_cdc_user" # 使用最小权限原则创建的用户
database.password: "your_secure_password"
database.dbname: "MyBusinessDB"
database.server.name: "sqlserver-prod" # 逻辑名称,会成为消息主题的前缀
# --- 表和 Schema 的包含/排除列表 ---
table.include.list: "dbo.Orders,dbo.OrderItems"
# --- CDC 核心配置 ---
# Debezium 使用的模式历史记录,Pulsar 提供了一个内置实现
schema.history.internal.pulsar.service.url: "pulsar://your-pulsar-broker:6650"
schema.history.internal.pulsar.topic: "persistent://public/default/schema-history-sqlserver"
# --- 消息格式与转换器 ---
# 我们选择 JSON 格式,因为它灵活且易于消费
key.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
# --- 容错与性能 ---
# 如果连接器因故停止,它会从上次记录的 LSN (Log Sequence Number) 继续,不会丢数据
# 最大批处理大小
max.batch.size: "2048"
# 拉取间隔
poll.interval.ms: "1000"
# --- 时区问题,一个常见的坑 ---
# 确保 Debezium 使用的 JVM 时区与数据库时区匹配,避免 DATETIME 类型转换错误
database.connection.timezone: "UTC"
启动连接器:./bin/pulsar-admin sources localrun --source-config-file /path/to/sqlserver-source-config.yaml
一个常见的错误是 database.server.name
配置。它不仅仅是一个名字,Debezium 会用它作为主题的前缀。例如,dbo.Orders
表的变更会发布到 persistent://public/default/sqlserver-prod.dbo.Orders
主题。为了简化消费逻辑,我们通常会使用 Pulsar 的 Topic Pattern 让一个消费者订阅多个相关主题。
步骤三:构建高韧性的消费者服务
这是整个管道的大脑,也是最复杂的代码部分。我们使用 Java 和 Pulsar 的官方客户端来实现。这个服务部署在 EKS 上,可以轻松地水平扩展。
核心职责:
- 订阅 Pulsar 中来自
Orders
和OrderItems
的变更主题。 - 解析 Debezium 格式的 JSON 消息。
- 根据消息类型 (
c
reate,u
pdate,d
elete),在内存中维护一个聚合状态。 - 将关系型数据转换为目标文档模型。
- 以幂等的方式写入 DocumentDB。
- 处理异常,并利用 Pulsar 的机制(如延迟重试和死信队列)确保数据不丢失。
以下是消费者核心逻辑的简化版 Java 代码:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import org.apache.pulsar.client.api.*;
import org.bson.Document;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class SqlServerToDocDbConsumer {
private static final String PULSAR_SERVICE_URL = "pulsar://your-pulsar-broker:6650";
private static final Pattern TOPIC_PATTERN = Pattern.compile("persistent://public/default/sqlserver-prod\\.dbo\\.(Orders|OrderItems)");
private static final String SUBSCRIPTION_NAME = "docdb-aggregator-subscription";
private static final String DEAD_LETTER_TOPIC = "persistent://public/default/sqlserver-cdc-dlq";
private static final String MONGO_CONNECTION_STRING = "mongodb://user:pass@your-docdb-cluster:27017";
private static final String MONGO_DATABASE = "OrdersDB";
private static final String MONGO_COLLECTION = "AggregatedOrders";
private final PulsarClient pulsarClient;
private final MongoClient mongoClient;
private final ObjectMapper objectMapper = new ObjectMapper();
public SqlServerToDocDbConsumer() throws PulsarClientException {
this.pulsarClient = PulsarClient.builder()
.serviceUrl(PULSAR_SERVICE_URL)
.build();
this.mongoClient = MongoClients.create(MONGO_CONNECTION_STRING);
}
public void start() throws PulsarClientException {
// 配置消费者
// 关键:启用重试和死信队列策略
DeadLetterPolicy dlqPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(10) // 最多重试10次
.deadLetterTopic(DEAD_LETTER_TOPIC)
.build();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(TOPIC_PATTERN)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionType(SubscriptionType.Key_Shared) // Key_Shared 保证同一订单的变更由同一个消费者处理
.ackTimeout(60, TimeUnit.SECONDS)
.deadLetterPolicy(dlqPolicy)
.reconsumerForNegativeAcks(new DefaultRedeliverBackoff(1000, 30000, 2)) // 指数退避重试
.subscribe();
System.out.println("Consumer started. Listening for messages...");
// 主消费循环
while (true) {
Message<byte[]> msg = consumer.receive();
String sourceTopic = msg.getTopicName();
try {
processMessage(msg);
consumer.acknowledge(msg);
} catch (Exception e) {
// 这里的异常可能是暂时的(如网络问题)或永久的(如数据格式错误)
// Pulsar的重试和DLQ策略会自动处理
System.err.printf("Failed to process message %s from topic %s. Error: %s. Triggering negative acknowledgement.%n",
msg.getMessageId(), sourceTopic, e.getMessage());
consumer.negativeAcknowledge(msg);
}
}
}
private void processMessage(Message<byte[]> msg) throws IOException {
String jsonPayload = new String(msg.getData());
JsonNode root = objectMapper.readTree(jsonPayload);
JsonNode payload = root.path("payload");
// Debezium 消息体可能为 null,表示删除操作,但关键信息在 key 中
if (payload.isNull()) {
// 处理删除逻辑, 此处简化
System.out.printf("Received delete event for key %s%n", msg.getKey());
return;
}
String op = payload.path("op").asText(); // 'c' for create, 'u' for update, 'r' for read (snapshot)
JsonNode data = payload.path("after");
if (data.isNull()){
return; // 忽略没有 'after' 状态的消息
}
if (msg.getTopicName().contains("Orders")) {
handleOrderChange(op, data);
} else if (msg.getTopicName().contains("OrderItems")) {
handleOrderItemChange(op, data);
}
}
private void handleOrderChange(String op, JsonNode orderData) {
long orderId = orderData.path("OrderID").asLong();
MongoCollection<Document> collection = getMongoCollection();
// 幂等写入:使用 upsert
UpdateOptions options = new UpdateOptions().upsert(true);
collection.updateOne(
Filters.eq("_id", orderId),
Updates.combine(
Updates.set("orderDate", orderData.path("OrderDate").asText()),
Updates.set("customerId", orderData.path("CustomerID").asText()),
Updates.set("status", orderData.path("Status").asText()),
Updates.set("lastUpdatedAt", new java.util.Date())
// ... 其他 Order 字段
),
options
);
System.out.printf("Processed Order %d change.%n", orderId);
}
private void handleOrderItemChange(String op, JsonNode itemData) {
long orderId = itemData.path("OrderID").asLong();
long orderItemId = itemData.path("OrderItemID").asLong();
MongoCollection<Document> collection = getMongoCollection();
Document itemDocument = new Document()
.append("itemId", orderItemId)
.append("productId", itemData.path("ProductID").asText())
.append("quantity", itemData.path("Quantity").asInt())
.append("price", itemData.path("Price").asDouble());
// 使用 $addToSet 或更复杂的逻辑来处理子文档的增删改
// 这里简化为覆盖式更新,真实场景可能需要更精细的操作
if ("c".equals(op) || "u".equals(op)) {
// 先拉出旧的item,再添加新的,或者直接替换。这里使用 $push
// 注意:这需要一个更复杂的查询来避免重复添加。
// 生产环境中,通常会使用 $pull 删除旧项,再 $push 添加新项
collection.updateOne(
Filters.eq("_id", orderId),
Updates.push("items", itemDocument)
);
} else if ("d".equals(op)) { // Debezium delete payload 'after' is null, 'before' has data.
// 此处简化,实际删除逻辑需要读取 'before' 字段
collection.updateOne(
Filters.eq("_id", orderId),
Updates.pull("items", Filters.eq("itemId", orderItemId))
);
}
System.out.printf("Processed OrderItem change for Order %d.%n", orderId);
}
private MongoCollection<Document> getMongoCollection() {
return mongoClient.getDatabase(MONGO_DATABASE).getCollection(MONGO_COLLECTION);
}
public static void main(String[] args) {
try {
new SqlServerToDocDbConsumer().start();
} catch (PulsarClientException e) {
System.err.println("Failed to start Pulsar consumer: " + e.getMessage());
e.printStackTrace();
System.exit(1);
}
}
}
代码中的关键设计决策:
-
SubscriptionType.Key_Shared
: 这是 Pulsar 的一个强大功能。Debezium 会将表的主键作为消息的 Key。Key_Shared
模式确保了拥有相同 Key 的消息(即同一个订单的所有变更)总是被路由到同一个消费者实例。这极大地简化了并发控制,避免了多个消费者同时处理同一个订单的Orders
和OrderItems
变更而导致的竞态条件。 - 幂等写入: 目标数据库的写入操作必须是幂等的。我们使用 DocumentDB 的
updateOne
配合upsert=true
选项。无论消息被重复消费多少次,最终的结果都是一致的。这是保证最终一致性的基石。 - 韧性设计:
DeadLetterPolicy
和DefaultRedeliverBackoff
是生产级消费者必须具备的。当消息处理失败时(例如,DocumentDB 暂时不可用),Pulsar 不会立即丢弃消息,而是会进行指数退避重试。如果超过最大重试次数仍然失败(例如,消息格式永久性错误),消息会被自动发送到指定的死信主题DEAD_LETTER_TOPIC
。这样,我们就可以对这些“毒丸”消息进行离线分析和手动处理,而不会阻塞整个数据管道。
潜在的挑战与未来优化
这个架构虽然健壮,但在长期运行中仍会遇到一些问题。
首先,Schema 演进是一个持续的挑战。当源 SQL Server 表添加、删除或修改列时,Debezium 会捕获这些 DDL 变化,并更新其内部的 Schema History Topic。我们的消费者需要有足够的弹性来处理这些变化,比如忽略新增的不关心字段,或者为删除的字段提供默认值。更复杂的变更可能需要修改消费者代码并重新部署。
其次,数据回填(Backfilling)。当需要将一个全新的、已有海量数据的表加入 CDC 时,Debezium 连接器会先执行一次全量快照(snapshot)。对于几十亿行的大表,这可能会对源数据库造成压力并花费很长时间。一个优化策略是,在非高峰时段启动连接器,或者使用更底层的、绕过 Debezium 的方式(如 bcp
工具导出)完成首次全量同步,然后启动 CDC 连接器从特定 LSN 位置开始增量同步。
最后,监控与可观测性至关重要。我们需要监控 Pulsar 的端到端延迟、消费者积压的消息数(backlog)、重试和死信队列中的消息数量。通过 Prometheus 和 Grafana 监控 Pulsar 暴露的指标,可以设置告警,比如当消费者 lag 超过5分钟时,就需要介入调查。这能帮助我们主动发现问题,而不是等业务方报告数据延迟。