基于C++与epoll实现MySQL Binlog实时同步至Server-Sent Events API的服务


项目的技术需求很直接:将一个核心业务数据库(MySQL)的某些关键表变更,实时推送给一组前端监控面板。最开始的方案是前端轮询一个API,但这很快就因为请求量巨大和延迟不可控而被否决。我们需要一个服务端推送的方案,而核心数据库是生产环境的重中之重,不允许安装任何插件或触发器,唯一可接受的数据源是它的二进制日志(Binlog)。

初步构想是使用现成的CDC工具,如Debezium,将Binlog变更推送到Kafka,再由一个Web服务消费Kafka消息并推送给前端。这个架构成熟可靠,但对于我们这个延迟敏感、资源受限的特定场景来说,显得过于笨重。引入Kafka和JVM全家桶会带来显著的运维成本和资源开销。我们的目标是构建一个轻量级、低延迟、高可控的独立服务。

最终的技术选型决策是:使用C++从零开始实现一个MySQL复制协议客户端,直接连接MySQL实例作为从库,实时拉取并解析Binlog事件。然后,在同一个进程内,构建一个基于epoll的高性能HTTP服务器,通过Server-Sent Events (SSE)协议将解析后的数据变更直接推送给前端客户端。

这个方案的优势在于:

  1. 极致性能与低延迟: C++和epoll的组合可以最大限度地减少内存占用和上下文切换开销。数据从MySQL到客户端的路径被缩短到极致,没有中间件的延迟。
  2. 资源可控: 纯C++实现,无JVM等运行时依赖,内存和CPU占用可被精确控制。
  3. 协议级控制: 直接实现MySQL复制协议,可以精细控制要拉取的Binlog位置、过滤不关心的事件,甚至在未来实现更复杂的逻辑。
  4. SSE的简洁性: 相比WebSocket,SSE是一个更简单的单向推送协议,完全基于标准HTTP,带有自动重连机制,非常适合这个场景。

当然,挑战也同样巨大:需要手动处理TCP连接、实现MySQL协议的握手和事件解析、以及构建一个稳定可靠的epoll事件循环。

第一步:实现MySQL复制协议客户端

要从MySQL获取Binlog,我们的服务必须伪装成一个MySQL从库。这涉及到遵循MySQL的复制协议,通过TCP连接发送特定命令。

核心流程是:

  1. 建立到MySQL服务器的TCP连接。
  2. 接收服务器的Greeting包,获取服务器信息。
  3. 发送Authentication包,完成登录握手。
  4. 发送COM_REGISTER_SLAVE命令,在主库上注册自己。
  5. 发送COM_BINLOG_DUMP命令,指定起始的Binlog文件名和位置,请求开始推送事件。
  6. 进入循环,持续接收和解析Binlog事件包。

下面是负责与MySQL建立连接并开始Binlog dump的核心C++代码。这里的错误处理和日志记录被简化,但在生产代码中至关重要。

// mysql_replicator.hpp
#pragma once

#include <string>
#include <functional>
#include <vector>

// 简化表示一个Binlog事件
struct BinlogEvent {
    uint32_t timestamp;
    uint8_t event_type;
    std::vector<char> raw_data; // 未解析的原始事件数据
};

class MySQLReplicator {
public:
    using EventHandler = std::function<void(const BinlogEvent&)>;

    MySQLReplicator(const std::string& host, int port, const std::string& user, const std::string& password, uint32_t server_id);
    ~MySQLReplicator();

    // 连接并开始复制
    bool connectAndStart(const std::string& binlog_file, uint64_t binlog_pos, EventHandler handler);
    void stop();
    int getFd() const { return sock_fd_; }

private:
    bool performHandshake();
    bool registerAsSlave();
    bool requestBinlogDump(const std::string& binlog_file, uint64_t binlog_pos);
    void replicationLoop();

    std::string host_;
    int port_;
    std::string user_;
    std::string password_;
    uint32_t server_id_;
    int sock_fd_ = -1;
    EventHandler event_handler_;
    
    // 省略了用于解析MySQL协议包的辅助函数和成员
};
// mysql_replicator.cpp (部分实现)
#include "mysql_replicator.hpp"
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <vector>

// ... 构造函数和析构函数 ...

// 这是一个高度简化的实现,仅用于展示流程
// 生产代码需要一个完整的MySQL协议解析器
bool MySQLReplicator::connectAndStart(const std::string& binlog_file, uint64_t binlog_pos, EventHandler handler) {
    event_handler_ = handler;
    sock_fd_ = socket(AF_INET, SOCK_STREAM, 0);
    if (sock_fd_ < 0) {
        perror("socket creation failed");
        return false;
    }

    sockaddr_in server_addr{};
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port_);
    inet_pton(AF_INET, host_.c_str(), &server_addr.sin_addr);

    if (connect(sock_fd_, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("connect failed");
        close(sock_fd_);
        sock_fd_ = -1;
        return false;
    }

    if (!performHandshake()) {
        std::cerr << "MySQL handshake failed" << std::endl;
        close(sock_fd_);
        sock_fd_ = -1;
        return false;
    }

    // 在真实项目中,这里应该先发送COM_REGISTER_SLAVE
    // if (!registerAsSlave()) { ... }

    if (!requestBinlogDump(binlog_file, binlog_pos)) {
        std::cerr << "Requesting binlog dump failed" << std::endl;
        close(sock_fd_);
        sock_fd_ = -1;
        return false;
    }
    
    std::cout << "Successfully connected to MySQL and started replication." << std::endl;
    return true;
}

bool MySQLReplicator::requestBinlogDump(const std::string& binlog_file, uint64_t binlog_pos) {
    // 构建 COM_BINLOG_DUMP 包
    // 格式: [packet_length:3][packet_number:1][command:1][binlog_pos:4][flags:2][server_id:4][binlog_filename]
    std::vector<char> packet;
    packet.resize(4); // 先预留包头位置

    packet.push_back(0x12); // COM_BINLOG_DUMP command
    
    // binlog_pos (4 bytes)
    packet.push_back(binlog_pos & 0xFF);
    packet.push_back((binlog_pos >> 8) & 0xFF);
    packet.push_back((binlog_pos >> 16) & 0xFF);
    packet.push_back((binlog_pos >> 24) & 0xFF);

    // flags (2 bytes) - 0x00 for no specific flags
    packet.push_back(0x00);
    packet.push_back(0x00);

    // server_id (4 bytes)
    packet.push_back(server_id_ & 0xFF);
    packet.push_back((server_id_ >> 8) & 0xFF);
    packet.push_back((server_id_ >> 16) & 0xFF);
    packet.push_back((server_id_ >> 24) & 0xFF);

    // binlog_filename
    packet.insert(packet.end(), binlog_file.begin(), binlog_file.end());

    // 填充包头
    uint32_t packet_len = packet.size() - 4;
    packet[0] = packet_len & 0xFF;
    packet[1] = (packet_len >> 8) & 0xFF;
    packet[2] = (packet_len >> 16) & 0xFF;
    packet[3] = 0x00; // packet_number, 初始设为0

    // 发送包
    if (write(sock_fd_, packet.data(), packet.size()) != packet.size()) {
        perror("write COM_BINLOG_DUMP failed");
        return false;
    }

    return true;
}

// 在实际项目中,replicationLoop会由epoll驱动,而不是一个独立的阻塞循环
// void MySQLReplicator::replicationLoop() { ... }

这里的坑在于,MySQL的协议是二进制的,且有多个版本。一个字节的错位都可能导致连接失败或数据解析错误。实现一个健壮的解析器是整个项目中非常耗时的一部分。在真实项目中,我们会为每个事件类型(如QUERY_EVENT, TABLE_MAP_EVENT, UPDATE_ROWS_EVENT_V2等)创建一个解析结构体,并严格按照官方文档进行字节对齐和解析。

第二步:构建基于epoll的SSE服务器

有了数据源,下一步是构建一个能处理大量并发长连接的HTTP服务器。epoll是Linux下处理此场景的利器。

我们的服务器需要:

  1. 创建一个监听socket,绑定端口。
  2. 创建一个epoll实例。
  3. 将监听socket添加到epoll中,监听EPOLLIN事件。
  4. 进入主事件循环 (epoll_wait)。
  5. 如果监听socket有事件,说明有新连接,调用accept,将新的客户端socket也添加到epoll中。
  6. 如果客户端socket有EPOLLIN事件,说明客户端发送了数据(例如HTTP GET请求)。我们需要解析请求,如果是针对SSE端点的,就发送SSE头并保持连接。
  7. 如果客户端socket有EPOLLHUPEPOLLERR事件,或者read返回0,说明连接已断开,需要将其从epoll和我们的客户端列表中移除,并关闭socket。
// sse_server.hpp
#pragma once

#include <string>
#include <unordered_map>
#include <functional>

class SSEServer {
public:
    SSEServer(int port);
    ~SSEServer();

    // 启动服务器,并提供一个处理新连接的回调
    bool start(std::function<void(int)> on_connect);
    
    // 主事件循环,需要外部驱动
    // 返回处理的事件数,-1表示错误
    int pollEvents(int timeout_ms);

    void broadcast(const std::string& message);
    void disconnectClient(int client_fd);

    int getEpollFd() const { return epoll_fd_; }

private:
    bool createAndBind();
    void handleNewConnection();

    int port_;
    int listen_fd_ = -1;
    int epoll_fd_ = -1;

    // 存储所有已连接的SSE客户端
    // 在真实项目中,这里的值可能是一个包含更多客户端状态的结构体
    std::unordered_map<int, bool> sse_clients_; 
    std::function<void(int)> on_connect_handler_;
};
// sse_server.cpp (部分实现)
#include "sse_server.hpp"
#include <iostream>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>

// ... 构造/析构 ...

bool SSEServer::start(std::function<void(int)> on_connect) {
    on_connect_handler_ = on_connect;
    if (!createAndBind()) return false;

    epoll_fd_ = epoll_create1(0);
    if (epoll_fd_ == -1) {
        perror("epoll_create1 failed");
        return false;
    }

    epoll_event event;
    event.data.fd = listen_fd_;
    event.events = EPOLLIN | EPOLLET; // 使用边缘触发
    if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd_, &event) == -1) {
        perror("epoll_ctl listen_fd failed");
        return false;
    }

    return true;
}

int SSEServer::pollEvents(int timeout_ms) {
    const int MAX_EVENTS = 64;
    epoll_event events[MAX_EVENTS];
    int n = epoll_wait(epoll_fd_, events, MAX_EVENTS, timeout_ms);

    for (int i = 0; i < n; ++i) {
        if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || !(events[i].events & EPOLLIN)) {
            // 发生错误,直接关闭客户端
            std::cerr << "epoll error on fd " << events[i].data.fd << std::endl;
            disconnectClient(events[i].data.fd);
            continue;
        }

        if (events[i].data.fd == listen_fd_) {
            // 监听到新连接
            handleNewConnection();
        } else {
            // 客户端socket可读,在我们的场景中主要是处理初次HTTP请求
            // 这个回调可以用来解析HTTP请求
            if (on_connect_handler_) {
                on_connect_handler_(events[i].data.fd);
            }
        }
    }
    return n;
}

void SSEServer::handleNewConnection() {
    while (true) {
        sockaddr_in client_addr;
        socklen_t client_len = sizeof(client_addr);
        int client_fd = accept(listen_fd_, (struct sockaddr*)&client_addr, &client_len);
        
        if (client_fd == -1) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 所有待处理的连接都已处理
                break;
            } else {
                perror("accept failed");
                break;
            }
        }

        // 设置为非阻塞
        int flags = fcntl(client_fd, F_GETFL, 0);
        fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);

        epoll_event event;
        event.data.fd = client_fd;
        event.events = EPOLLIN | EPOLLET;
        if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) == -1) {
            perror("epoll_ctl client_fd failed");
            close(client_fd);
        } else {
            // 暂时不加入sse_clients_,等HTTP请求确认是SSE连接后再加入
            std::cout << "Accepted new connection with fd " << client_fd << std::endl;
        }
    }
}

// 广播SSE消息给所有已确认的客户端
void SSEServer::broadcast(const std::string& message) {
    std::string sse_payload = "data: " + message + "\n\n";
    std::vector<int> disconnected_clients;

    for (const auto& pair : sse_clients_) {
        int fd = pair.first;
        if (write(fd, sse_payload.c_str(), sse_payload.length()) < 0) {
            if (errno == EPIPE || errno == ECONNRESET) {
                 // 客户端已断开
                disconnected_clients.push_back(fd);
            }
            // 对于EAGAIN/EWOULDBLOCK,真实项目需要实现写缓冲
        }
    }
    
    for (int fd : disconnected_clients) {
        disconnectClient(fd);
    }
}

void SSEServer::disconnectClient(int client_fd) {
    epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr);
    close(client_fd);
    sse_clients_.erase(client_fd);
    std::cout << "Client with fd " << client_fd << " disconnected." << std::endl;
}

第三步:整合Binlog客户端与SSE服务器

这是整个系统的核心调度逻辑。我们需要一个统一的事件循环,同时监听来自MySQL的Binlog事件和来自客户端的连接/断开事件。epoll天然适合这个场景,我们可以把MySQL的socket句柄和SSE服务器的监听句柄都加入到同一个epoll实例中。

graph TD
    subgraph C++ Service
        A[epoll_wait] -- event --> B{Event Dispatcher};
        B -- MySQL Socket readable --> C[MySQLReplicator: Read & Parse Binlog];
        C -- Parsed Event --> D[Broadcast to SSE Clients];
        B -- Listen Socket readable --> E[SSEServer: Accept New Client];
        B -- Client Socket readable --> F[Handle HTTP Request -> Upgrade to SSE];
        B -- Client Socket error/hup --> G[SSEServer: Disconnect Client];
    end

    subgraph External
        MySQL -- Binlog Stream --> C;
        WebClient -- HTTP GET /events --> E;
        D -- SSE Stream --> WebClient;
    end

主程序的逻辑如下:

  1. 初始化SSEServerMySQLReplicator
  2. 启动SSEServer,获取其epoll_fd
  3. MySQLReplicator连接到MySQL,获取其sock_fd
  4. MySQLReplicatorsock_fd也添加到SSEServer管理的epoll实例中。
  5. 进入主循环,不断调用SSEServer::pollEvents
  6. pollEvents内部的事件分发逻辑中,判断事件来源fd:
    • 如果是listen_fd,则接受新连接。
    • 如果是mysql_fd,则读取并解析Binlog数据包。解析出的事件被格式化(如转为JSON)后,调用SSEServer::broadcast广播出去。
    • 如果是某个client_fd,则处理其初次HTTP请求,验证通过后将其加入sse_clients_列表并发送SSE响应头。
// main.cpp
#include "sse_server.hpp"
#include "mysql_replicator.hpp"
#include <iostream>
#include <vector>
#include <signal.h>
#include <sys/epoll.h>

volatile sig_atomic_t g_running = 1;

void sigint_handler(int sig) {
    g_running = 0;
}

// 简化的Binlog事件解析和JSON转换
std::string parse_and_format_event(const BinlogEvent& event) {
    // 生产环境中,这里会是一个复杂的解析器,
    // 根据event_type调用不同的解析函数。
    // 这里我们只做一个简单的示例。
    if (event.event_type == 31) { // UPDATE_ROWS_EVENT_V2
        // 伪代码:
        // auto table_id = parse_u64(event.raw_data, offset);
        // auto table_info = table_map.get(table_id);
        // auto before_image = parse_row(event.raw_data, table_info.columns);
        // auto after_image = parse_row(event.raw_data, table_info.columns);
        // return format_as_json(table_info.name, before_image, after_image);
        return R"({"event_type": "update", "table": "users", "id": 123, "new_value": {"name": "test"}})";
    }
    return "";
}


int main() {
    signal(SIGINT, sigint_handler);

    // ---- 配置 ----
    const int SSE_PORT = 8080;
    const std::string MYSQL_HOST = "127.0.0.1";
    const int MYSQL_PORT = 3306;
    const std::string MYSQL_USER = "replicator";
    const std::string MYSQL_PASS = "your_password";
    const uint32_t REPL_SERVER_ID = 101;
    const std::string START_BINLOG_FILE = "mysql-bin.000003";
    const uint64_t START_BINLOG_POS = 154;
    // --------------

    SSEServer sse_server(SSE_PORT);
    MySQLReplicator replicator(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, REPL_SERVER_ID);
    
    // 启动SSE服务器,并定义新连接的处理逻辑
    sse_server.start([&](int client_fd){
        // 在真实项目中,这里会读取并解析完整的HTTP请求
        // 然后发送HTTP响应头,确认SSE连接
        char buffer[1024];
        read(client_fd, buffer, sizeof(buffer)); // 仅为消耗掉请求数据

        const char* sse_header = 
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: text/event-stream\r\n"
            "Cache-Control: no-cache\r\n"
            "Connection: keep-alive\r\n\r\n";
        
        write(client_fd, sse_header, strlen(sse_header));

        // 此时才将客户端正式加入广播列表
        // sse_server.sse_clients_[client_fd] = true; (应由SSEServer内部管理)
    });

    // 连接MySQL并开始复制
    replicator.connectAndStart(START_BINLOG_FILE, START_BINLOG_POS, [&](const BinlogEvent& event){
        std::string json_event = parse_and_format_event(event);
        if (!json_event.empty()) {
            sse_server.broadcast(json_event);
        }
    });

    int mysql_fd = replicator.getFd();
    if (mysql_fd > 0) {
        epoll_event event;
        event.data.fd = mysql_fd;
        event.events = EPOLLIN | EPOLLET;
        if (epoll_ctl(sse_server.getEpollFd(), EPOLL_CTL_ADD, mysql_fd, &event) == -1) {
            perror("epoll_ctl mysql_fd failed");
            return 1;
        }
    }

    std::cout << "Server started. Press Ctrl+C to exit." << std::endl;

    while (g_running) {
        // 主事件循环由SSEServer的pollEvents驱动
        // 内部会处理所有epoll事件
        // 但我们需要自己实现mysql fd的事件处理
        // (此处的示例代码结构需要重构,让事件循环更通用)
        // 一个更优雅的设计是让main拥有epoll循环,server和replicator都只提供fd
        sse_server.pollEvents(100); // 100ms timeout
    }
    
    std::cout << "Shutting down." << std::endl;

    return 0;
}

上面main函数的结构展示了思路,但在真实项目中,epoll的事件分发逻辑需要更细致。当epoll_wait返回时,你需要检查是哪个fd触发了事件,然后调用相应的处理器。例如,一个std::unordered_map<int, std::function<void()>>来映射fd到处理函数会是更清晰的实现。

API设计与韧性考量

一个生产级的SSE API不仅仅是推送数据,还必须考虑客户端重连后如何恢复状态。HTTP的Last-Event-ID头是为这个场景设计的。

我们的API可以这样设计:

  • 客户端首次连接:GET /events
  • 服务器在每次推送事件时,可以附带一个id字段,例如:
    id: mysql-bin.000003:4590
    data: {"event_type": "update", "table": "users", ...}
    
  • 当客户端断线重连时,它会在HTTP请求头中携带Last-Event-ID: mysql-bin.000003:4590
  • 我们的C++服务器在收到带有Last-Event-ID头的请求时,需要解析这个ID,从中提取出Binlog文件名和位置,然后从这个指定的位置开始COM_BINLOG_DUMP,而不是从配置的初始位置开始。

这使得我们的服务具备了断点续传的能力,大大提高了系统的健壮性。

局限性与未来迭代路径

这套从零构建的方案虽然性能卓越,但也存在明显的局限性:

  1. 单点故障与扩展性: 当前实现是单进程的。如果进程崩溃,所有客户端都会断开,且可能会丢失一小部分Binlog事件(取决于持久化Binlog位置的频率)。它也无法水平扩展以处理远超单机承受能力的客户端连接。要实现高可用,需要引入集群、状态同步(如使用Redis记录每个客户端的Binlog位点)和负载均衡。

  2. Binlog解析的复杂性: 我们只展示了处理单一事件类型的思路。一个完整的Binlog解析器需要处理数十种事件类型、表结构变更(DDL)、字符集转换以及各种MySQL版本的兼容性问题。这部分工作量巨大,是此方案最大的技术壁垒。

  3. 反压与缓冲: 如果前端消费速度跟不上后端Binlog产生的速度,服务器的内存缓冲会持续增长,最终导致OOM。生产系统必须实现反压机制,例如当某个客户端的写缓冲区满时,暂时减慢或停止向其发送数据,甚至主动断开慢消费者。

  4. 运维复杂度: 相比现成的解决方案,自研服务的监控、日志、告警都需要从头建设。比如,监控Binlog同步的延迟(主从延迟)就是一个至关重要的指标。

尽管存在这些挑战,但通过这个实现,我们获得了一个完全可控、性能优异的底层组件。对于需要极致延迟和资源效率的特定场景,这种深度定制的投入是值得的。未来的优化方向可能包括:将其重构为多线程模型(一个I/O线程,多个工作线程),实现更完整的Binlog解析库,以及集成Prometheus指标进行深度监控。


  目录