项目的技术需求很直接:将一个核心业务数据库(MySQL)的某些关键表变更,实时推送给一组前端监控面板。最开始的方案是前端轮询一个API,但这很快就因为请求量巨大和延迟不可控而被否决。我们需要一个服务端推送的方案,而核心数据库是生产环境的重中之重,不允许安装任何插件或触发器,唯一可接受的数据源是它的二进制日志(Binlog)。
初步构想是使用现成的CDC工具,如Debezium,将Binlog变更推送到Kafka,再由一个Web服务消费Kafka消息并推送给前端。这个架构成熟可靠,但对于我们这个延迟敏感、资源受限的特定场景来说,显得过于笨重。引入Kafka和JVM全家桶会带来显著的运维成本和资源开销。我们的目标是构建一个轻量级、低延迟、高可控的独立服务。
最终的技术选型决策是:使用C++从零开始实现一个MySQL复制协议客户端,直接连接MySQL实例作为从库,实时拉取并解析Binlog事件。然后,在同一个进程内,构建一个基于epoll
的高性能HTTP服务器,通过Server-Sent Events (SSE)协议将解析后的数据变更直接推送给前端客户端。
这个方案的优势在于:
- 极致性能与低延迟: C++和
epoll
的组合可以最大限度地减少内存占用和上下文切换开销。数据从MySQL到客户端的路径被缩短到极致,没有中间件的延迟。 - 资源可控: 纯C++实现,无JVM等运行时依赖,内存和CPU占用可被精确控制。
- 协议级控制: 直接实现MySQL复制协议,可以精细控制要拉取的Binlog位置、过滤不关心的事件,甚至在未来实现更复杂的逻辑。
- SSE的简洁性: 相比WebSocket,SSE是一个更简单的单向推送协议,完全基于标准HTTP,带有自动重连机制,非常适合这个场景。
当然,挑战也同样巨大:需要手动处理TCP连接、实现MySQL协议的握手和事件解析、以及构建一个稳定可靠的epoll
事件循环。
第一步:实现MySQL复制协议客户端
要从MySQL获取Binlog,我们的服务必须伪装成一个MySQL从库。这涉及到遵循MySQL的复制协议,通过TCP连接发送特定命令。
核心流程是:
- 建立到MySQL服务器的TCP连接。
- 接收服务器的Greeting包,获取服务器信息。
- 发送Authentication包,完成登录握手。
- 发送
COM_REGISTER_SLAVE
命令,在主库上注册自己。 - 发送
COM_BINLOG_DUMP
命令,指定起始的Binlog文件名和位置,请求开始推送事件。 - 进入循环,持续接收和解析Binlog事件包。
下面是负责与MySQL建立连接并开始Binlog dump的核心C++代码。这里的错误处理和日志记录被简化,但在生产代码中至关重要。
// mysql_replicator.hpp
#pragma once
#include <string>
#include <functional>
#include <vector>
// 简化表示一个Binlog事件
struct BinlogEvent {
uint32_t timestamp;
uint8_t event_type;
std::vector<char> raw_data; // 未解析的原始事件数据
};
class MySQLReplicator {
public:
using EventHandler = std::function<void(const BinlogEvent&)>;
MySQLReplicator(const std::string& host, int port, const std::string& user, const std::string& password, uint32_t server_id);
~MySQLReplicator();
// 连接并开始复制
bool connectAndStart(const std::string& binlog_file, uint64_t binlog_pos, EventHandler handler);
void stop();
int getFd() const { return sock_fd_; }
private:
bool performHandshake();
bool registerAsSlave();
bool requestBinlogDump(const std::string& binlog_file, uint64_t binlog_pos);
void replicationLoop();
std::string host_;
int port_;
std::string user_;
std::string password_;
uint32_t server_id_;
int sock_fd_ = -1;
EventHandler event_handler_;
// 省略了用于解析MySQL协议包的辅助函数和成员
};
// mysql_replicator.cpp (部分实现)
#include "mysql_replicator.hpp"
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <vector>
// ... 构造函数和析构函数 ...
// 这是一个高度简化的实现,仅用于展示流程
// 生产代码需要一个完整的MySQL协议解析器
bool MySQLReplicator::connectAndStart(const std::string& binlog_file, uint64_t binlog_pos, EventHandler handler) {
event_handler_ = handler;
sock_fd_ = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd_ < 0) {
perror("socket creation failed");
return false;
}
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port_);
inet_pton(AF_INET, host_.c_str(), &server_addr.sin_addr);
if (connect(sock_fd_, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("connect failed");
close(sock_fd_);
sock_fd_ = -1;
return false;
}
if (!performHandshake()) {
std::cerr << "MySQL handshake failed" << std::endl;
close(sock_fd_);
sock_fd_ = -1;
return false;
}
// 在真实项目中,这里应该先发送COM_REGISTER_SLAVE
// if (!registerAsSlave()) { ... }
if (!requestBinlogDump(binlog_file, binlog_pos)) {
std::cerr << "Requesting binlog dump failed" << std::endl;
close(sock_fd_);
sock_fd_ = -1;
return false;
}
std::cout << "Successfully connected to MySQL and started replication." << std::endl;
return true;
}
bool MySQLReplicator::requestBinlogDump(const std::string& binlog_file, uint64_t binlog_pos) {
// 构建 COM_BINLOG_DUMP 包
// 格式: [packet_length:3][packet_number:1][command:1][binlog_pos:4][flags:2][server_id:4][binlog_filename]
std::vector<char> packet;
packet.resize(4); // 先预留包头位置
packet.push_back(0x12); // COM_BINLOG_DUMP command
// binlog_pos (4 bytes)
packet.push_back(binlog_pos & 0xFF);
packet.push_back((binlog_pos >> 8) & 0xFF);
packet.push_back((binlog_pos >> 16) & 0xFF);
packet.push_back((binlog_pos >> 24) & 0xFF);
// flags (2 bytes) - 0x00 for no specific flags
packet.push_back(0x00);
packet.push_back(0x00);
// server_id (4 bytes)
packet.push_back(server_id_ & 0xFF);
packet.push_back((server_id_ >> 8) & 0xFF);
packet.push_back((server_id_ >> 16) & 0xFF);
packet.push_back((server_id_ >> 24) & 0xFF);
// binlog_filename
packet.insert(packet.end(), binlog_file.begin(), binlog_file.end());
// 填充包头
uint32_t packet_len = packet.size() - 4;
packet[0] = packet_len & 0xFF;
packet[1] = (packet_len >> 8) & 0xFF;
packet[2] = (packet_len >> 16) & 0xFF;
packet[3] = 0x00; // packet_number, 初始设为0
// 发送包
if (write(sock_fd_, packet.data(), packet.size()) != packet.size()) {
perror("write COM_BINLOG_DUMP failed");
return false;
}
return true;
}
// 在实际项目中,replicationLoop会由epoll驱动,而不是一个独立的阻塞循环
// void MySQLReplicator::replicationLoop() { ... }
这里的坑在于,MySQL的协议是二进制的,且有多个版本。一个字节的错位都可能导致连接失败或数据解析错误。实现一个健壮的解析器是整个项目中非常耗时的一部分。在真实项目中,我们会为每个事件类型(如QUERY_EVENT
, TABLE_MAP_EVENT
, UPDATE_ROWS_EVENT_V2
等)创建一个解析结构体,并严格按照官方文档进行字节对齐和解析。
第二步:构建基于epoll的SSE服务器
有了数据源,下一步是构建一个能处理大量并发长连接的HTTP服务器。epoll
是Linux下处理此场景的利器。
我们的服务器需要:
- 创建一个监听socket,绑定端口。
- 创建一个
epoll
实例。 - 将监听socket添加到
epoll
中,监听EPOLLIN
事件。 - 进入主事件循环 (
epoll_wait
)。 - 如果监听socket有事件,说明有新连接,调用
accept
,将新的客户端socket也添加到epoll
中。 - 如果客户端socket有
EPOLLIN
事件,说明客户端发送了数据(例如HTTP GET请求)。我们需要解析请求,如果是针对SSE端点的,就发送SSE头并保持连接。 - 如果客户端socket有
EPOLLHUP
或EPOLLERR
事件,或者read
返回0,说明连接已断开,需要将其从epoll
和我们的客户端列表中移除,并关闭socket。
// sse_server.hpp
#pragma once
#include <string>
#include <unordered_map>
#include <functional>
class SSEServer {
public:
SSEServer(int port);
~SSEServer();
// 启动服务器,并提供一个处理新连接的回调
bool start(std::function<void(int)> on_connect);
// 主事件循环,需要外部驱动
// 返回处理的事件数,-1表示错误
int pollEvents(int timeout_ms);
void broadcast(const std::string& message);
void disconnectClient(int client_fd);
int getEpollFd() const { return epoll_fd_; }
private:
bool createAndBind();
void handleNewConnection();
int port_;
int listen_fd_ = -1;
int epoll_fd_ = -1;
// 存储所有已连接的SSE客户端
// 在真实项目中,这里的值可能是一个包含更多客户端状态的结构体
std::unordered_map<int, bool> sse_clients_;
std::function<void(int)> on_connect_handler_;
};
// sse_server.cpp (部分实现)
#include "sse_server.hpp"
#include <iostream>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
// ... 构造/析构 ...
bool SSEServer::start(std::function<void(int)> on_connect) {
on_connect_handler_ = on_connect;
if (!createAndBind()) return false;
epoll_fd_ = epoll_create1(0);
if (epoll_fd_ == -1) {
perror("epoll_create1 failed");
return false;
}
epoll_event event;
event.data.fd = listen_fd_;
event.events = EPOLLIN | EPOLLET; // 使用边缘触发
if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd_, &event) == -1) {
perror("epoll_ctl listen_fd failed");
return false;
}
return true;
}
int SSEServer::pollEvents(int timeout_ms) {
const int MAX_EVENTS = 64;
epoll_event events[MAX_EVENTS];
int n = epoll_wait(epoll_fd_, events, MAX_EVENTS, timeout_ms);
for (int i = 0; i < n; ++i) {
if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || !(events[i].events & EPOLLIN)) {
// 发生错误,直接关闭客户端
std::cerr << "epoll error on fd " << events[i].data.fd << std::endl;
disconnectClient(events[i].data.fd);
continue;
}
if (events[i].data.fd == listen_fd_) {
// 监听到新连接
handleNewConnection();
} else {
// 客户端socket可读,在我们的场景中主要是处理初次HTTP请求
// 这个回调可以用来解析HTTP请求
if (on_connect_handler_) {
on_connect_handler_(events[i].data.fd);
}
}
}
return n;
}
void SSEServer::handleNewConnection() {
while (true) {
sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd = accept(listen_fd_, (struct sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 所有待处理的连接都已处理
break;
} else {
perror("accept failed");
break;
}
}
// 设置为非阻塞
int flags = fcntl(client_fd, F_GETFL, 0);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
epoll_event event;
event.data.fd = client_fd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) == -1) {
perror("epoll_ctl client_fd failed");
close(client_fd);
} else {
// 暂时不加入sse_clients_,等HTTP请求确认是SSE连接后再加入
std::cout << "Accepted new connection with fd " << client_fd << std::endl;
}
}
}
// 广播SSE消息给所有已确认的客户端
void SSEServer::broadcast(const std::string& message) {
std::string sse_payload = "data: " + message + "\n\n";
std::vector<int> disconnected_clients;
for (const auto& pair : sse_clients_) {
int fd = pair.first;
if (write(fd, sse_payload.c_str(), sse_payload.length()) < 0) {
if (errno == EPIPE || errno == ECONNRESET) {
// 客户端已断开
disconnected_clients.push_back(fd);
}
// 对于EAGAIN/EWOULDBLOCK,真实项目需要实现写缓冲
}
}
for (int fd : disconnected_clients) {
disconnectClient(fd);
}
}
void SSEServer::disconnectClient(int client_fd) {
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr);
close(client_fd);
sse_clients_.erase(client_fd);
std::cout << "Client with fd " << client_fd << " disconnected." << std::endl;
}
第三步:整合Binlog客户端与SSE服务器
这是整个系统的核心调度逻辑。我们需要一个统一的事件循环,同时监听来自MySQL的Binlog事件和来自客户端的连接/断开事件。epoll
天然适合这个场景,我们可以把MySQL的socket句柄和SSE服务器的监听句柄都加入到同一个epoll
实例中。
graph TD subgraph C++ Service A[epoll_wait] -- event --> B{Event Dispatcher}; B -- MySQL Socket readable --> C[MySQLReplicator: Read & Parse Binlog]; C -- Parsed Event --> D[Broadcast to SSE Clients]; B -- Listen Socket readable --> E[SSEServer: Accept New Client]; B -- Client Socket readable --> F[Handle HTTP Request -> Upgrade to SSE]; B -- Client Socket error/hup --> G[SSEServer: Disconnect Client]; end subgraph External MySQL -- Binlog Stream --> C; WebClient -- HTTP GET /events --> E; D -- SSE Stream --> WebClient; end
主程序的逻辑如下:
- 初始化
SSEServer
和MySQLReplicator
。 - 启动
SSEServer
,获取其epoll_fd
。 -
MySQLReplicator
连接到MySQL,获取其sock_fd
。 - 将
MySQLReplicator
的sock_fd
也添加到SSEServer
管理的epoll
实例中。 - 进入主循环,不断调用
SSEServer::pollEvents
。 - 在
pollEvents
内部的事件分发逻辑中,判断事件来源fd:- 如果是
listen_fd
,则接受新连接。 - 如果是
mysql_fd
,则读取并解析Binlog数据包。解析出的事件被格式化(如转为JSON)后,调用SSEServer::broadcast
广播出去。 - 如果是某个
client_fd
,则处理其初次HTTP请求,验证通过后将其加入sse_clients_
列表并发送SSE响应头。
- 如果是
// main.cpp
#include "sse_server.hpp"
#include "mysql_replicator.hpp"
#include <iostream>
#include <vector>
#include <signal.h>
#include <sys/epoll.h>
volatile sig_atomic_t g_running = 1;
void sigint_handler(int sig) {
g_running = 0;
}
// 简化的Binlog事件解析和JSON转换
std::string parse_and_format_event(const BinlogEvent& event) {
// 生产环境中,这里会是一个复杂的解析器,
// 根据event_type调用不同的解析函数。
// 这里我们只做一个简单的示例。
if (event.event_type == 31) { // UPDATE_ROWS_EVENT_V2
// 伪代码:
// auto table_id = parse_u64(event.raw_data, offset);
// auto table_info = table_map.get(table_id);
// auto before_image = parse_row(event.raw_data, table_info.columns);
// auto after_image = parse_row(event.raw_data, table_info.columns);
// return format_as_json(table_info.name, before_image, after_image);
return R"({"event_type": "update", "table": "users", "id": 123, "new_value": {"name": "test"}})";
}
return "";
}
int main() {
signal(SIGINT, sigint_handler);
// ---- 配置 ----
const int SSE_PORT = 8080;
const std::string MYSQL_HOST = "127.0.0.1";
const int MYSQL_PORT = 3306;
const std::string MYSQL_USER = "replicator";
const std::string MYSQL_PASS = "your_password";
const uint32_t REPL_SERVER_ID = 101;
const std::string START_BINLOG_FILE = "mysql-bin.000003";
const uint64_t START_BINLOG_POS = 154;
// --------------
SSEServer sse_server(SSE_PORT);
MySQLReplicator replicator(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, REPL_SERVER_ID);
// 启动SSE服务器,并定义新连接的处理逻辑
sse_server.start([&](int client_fd){
// 在真实项目中,这里会读取并解析完整的HTTP请求
// 然后发送HTTP响应头,确认SSE连接
char buffer[1024];
read(client_fd, buffer, sizeof(buffer)); // 仅为消耗掉请求数据
const char* sse_header =
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/event-stream\r\n"
"Cache-Control: no-cache\r\n"
"Connection: keep-alive\r\n\r\n";
write(client_fd, sse_header, strlen(sse_header));
// 此时才将客户端正式加入广播列表
// sse_server.sse_clients_[client_fd] = true; (应由SSEServer内部管理)
});
// 连接MySQL并开始复制
replicator.connectAndStart(START_BINLOG_FILE, START_BINLOG_POS, [&](const BinlogEvent& event){
std::string json_event = parse_and_format_event(event);
if (!json_event.empty()) {
sse_server.broadcast(json_event);
}
});
int mysql_fd = replicator.getFd();
if (mysql_fd > 0) {
epoll_event event;
event.data.fd = mysql_fd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(sse_server.getEpollFd(), EPOLL_CTL_ADD, mysql_fd, &event) == -1) {
perror("epoll_ctl mysql_fd failed");
return 1;
}
}
std::cout << "Server started. Press Ctrl+C to exit." << std::endl;
while (g_running) {
// 主事件循环由SSEServer的pollEvents驱动
// 内部会处理所有epoll事件
// 但我们需要自己实现mysql fd的事件处理
// (此处的示例代码结构需要重构,让事件循环更通用)
// 一个更优雅的设计是让main拥有epoll循环,server和replicator都只提供fd
sse_server.pollEvents(100); // 100ms timeout
}
std::cout << "Shutting down." << std::endl;
return 0;
}
上面main
函数的结构展示了思路,但在真实项目中,epoll
的事件分发逻辑需要更细致。当epoll_wait
返回时,你需要检查是哪个fd
触发了事件,然后调用相应的处理器。例如,一个std::unordered_map<int, std::function<void()>>
来映射fd
到处理函数会是更清晰的实现。
API设计与韧性考量
一个生产级的SSE API不仅仅是推送数据,还必须考虑客户端重连后如何恢复状态。HTTP的Last-Event-ID
头是为这个场景设计的。
我们的API可以这样设计:
- 客户端首次连接:
GET /events
- 服务器在每次推送事件时,可以附带一个
id
字段,例如:id: mysql-bin.000003:4590 data: {"event_type": "update", "table": "users", ...}
- 当客户端断线重连时,它会在HTTP请求头中携带
Last-Event-ID: mysql-bin.000003:4590
。 - 我们的C++服务器在收到带有
Last-Event-ID
头的请求时,需要解析这个ID,从中提取出Binlog文件名和位置,然后从这个指定的位置开始COM_BINLOG_DUMP
,而不是从配置的初始位置开始。
这使得我们的服务具备了断点续传的能力,大大提高了系统的健壮性。
局限性与未来迭代路径
这套从零构建的方案虽然性能卓越,但也存在明显的局限性:
单点故障与扩展性: 当前实现是单进程的。如果进程崩溃,所有客户端都会断开,且可能会丢失一小部分Binlog事件(取决于持久化Binlog位置的频率)。它也无法水平扩展以处理远超单机承受能力的客户端连接。要实现高可用,需要引入集群、状态同步(如使用Redis记录每个客户端的Binlog位点)和负载均衡。
Binlog解析的复杂性: 我们只展示了处理单一事件类型的思路。一个完整的Binlog解析器需要处理数十种事件类型、表结构变更(DDL)、字符集转换以及各种MySQL版本的兼容性问题。这部分工作量巨大,是此方案最大的技术壁垒。
反压与缓冲: 如果前端消费速度跟不上后端Binlog产生的速度,服务器的内存缓冲会持续增长,最终导致OOM。生产系统必须实现反压机制,例如当某个客户端的写缓冲区满时,暂时减慢或停止向其发送数据,甚至主动断开慢消费者。
运维复杂度: 相比现成的解决方案,自研服务的监控、日志、告警都需要从头建设。比如,监控Binlog同步的延迟(主从延迟)就是一个至关重要的指标。
尽管存在这些挑战,但通过这个实现,我们获得了一个完全可控、性能优异的底层组件。对于需要极致延迟和资源效率的特定场景,这种深度定制的投入是值得的。未来的优化方向可能包括:将其重构为多线程模型(一个I/O线程,多个工作线程),实现更完整的Binlog解析库,以及集成Prometheus指标进行深度监控。