构建基于 Pulsar 与文档型 NoSQL 的 MLOps 实时特征管道


线上一个推荐模型的 P99 响应延迟最近频繁触及告警阈值。排查了一圈,发现瓶颈并不在模型推理本身,而是上游的特征获取。当前的实现是在每次请求时,实时调用多个微服务,并查询 MySQL 和 Redis,动态拼凑成一个特征向量。这种“读时计算”(Compute-on-Read)的架构在业务初期跑得还不错,但随着特征复杂度和请求量的增加,它变得脆弱不堪,延迟的累加效应和任何一个下游服务抖动都会造成灾难。

更严重的问题是训练-服务偏斜(Training-Serving Skew)。离线的特征工程跑在 Spark 上,用一套 Python 和 SQL 脚本;线上的实时特征则是 Java 服务实现的另一套逻辑。两套代码,两套环境,逻辑稍有不一致,模型的线上表现就会劣化,而这种错误极难定位。

我们必须重构。核心思路是转向“写时计算”(Compute-on-Write):建立一个专门的实时特征管道,将特征提前计算好,存入一个低延迟的在线存储中。模型服务时,只需通过一个唯一的实体ID(比如 user_id)进行一次简单的键值查询,就能拿到完整的特征向量。

技术选型决策

初步构想有了,接下来是技术选型。这是一个典型的流式处理场景,我们需要一个消息队列、一个流计算引擎和一个在线存储。

  1. 消息与流处理平台:为何选择 Pulsar?

    • 我们评估了 Kafka 和 Pulsar。Kafka 是事实标准,生态成熟。但 Pulsar 有两个特性在这里具备压倒性优势。第一是 Pulsar Functions,一个内建的轻量级无服务器计算框架。对于我们大部分特征计算(如计数、求和、取最新值)来说,它足够用了,我们不需要引入并维护一个重量级的 Flink 或 Spark Streaming 集群,架构大幅简化。第二是 分层存储。Pulsar能将老旧数据自动卸载到S3等对象存储,但对客户端来说这些数据依然是透明可访问的。这意味着我们可以用一套API回溯处理几个月前的历史事件来为新模型“回填”特征,这在 MLOps 场景中是刚需。
  2. 在线特征存储:为何选择文档型 NoSQL (MongoDB)?

    • 在线存储的读延迟必须稳定在个位数毫秒。我们排除了关系型数据库,因为特征的演进非常快,频繁 ALTER TABLE 在生产环境是不可接受的。在键值存储和文档型数据库之间,我们选择了后者。虽然 Redis 读写性能极致,但我们的特征向量可能很宽,包含数十甚至上百个特征,有些还是嵌套结构。使用 Redis Hash 存储会失去数据结构的表达力,且缺乏灵活的二级索引能力。
    • MongoDB 这样的文档型数据库是理想的折中。它通过 _id 的单点查询性能与键值存储相当。同时,一个文档可以直接对应一个实体的完整特征集,数据模型非常直观。其灵活的 Schema 允许我们随时增删特征,而无需变更表结构,这对于快速迭代的算法团队至关重要。

最终的架构图如下:

graph TD
    subgraph Event Sources
        A[Web/App Clicks] -->|JSON Event| B(Pulsar Topic: raw_events)
        C[Business DB CDC] -->|Debezium| B
    end

    subgraph Real-time Feature Pipeline
        B -- "Consumed by" --> D{Pulsar Function: feature_transformer}
        D -- "State stored in" --> E[Pulsar BookKeeper State]
        D -- "Produces" --> F(Pulsar Topic: feature_updates)
    end

    subgraph Online Feature Store
        F -- "Consumed by" --> G(Pulsar MongoDB Sink)
        G -- "Writes to" --> H[MongoDB: online_feature_store]
    end

    subgraph Model Serving
        I[Inference Service] -- "GET /features?user_id=xyz" --> H
    end

    style D fill:#f9f,stroke:#333,stroke-width:2px
    style H fill:#9f9,stroke:#333,stroke-width:2px

步骤化实现

1. 数据建模:MongoDB中的实体特征文档

在真实项目中,数据模型的设计至关重要。我们采用以实体为中心(Entity-Centric)的建模方式。每个文档代表一个实体(如用户),_id 就是实体ID。所有与该实体相关的特征都聚合在这个文档内。

一个用户特征文档的示例结构如下:

{
  "_id": "user-a1b2c3d4",
  "last_updated_ts": 1678886400123,
  "features": {
    "profile_features": {
      "gender": "FEMALE",
      "age_bucket": "25-34",
      "city_tier": "T1"
    },
    "behavioral_features_streaming": {
      "clicks_last_1h": { "value": 15, "ts": 1678886399000 },
      "clicks_last_24h": { "value": 128, "ts": 1678886399000 },
      "distinct_categories_viewed_7d": { "value": ["cat_3", "cat_5", "cat_12"], "ts": 1678886100000 }
    },
    "behavioral_features_batch": {
      "avg_purchase_value_30d": 157.8,
      "preferred_payment_method": "CREDIT_CARD"
    }
  }
}

设计考量:

  • _id: 使用业务上唯一的实体ID,并在此字段上建立唯一索引,保证极快的查询性能。
  • features 嵌套对象: 将不同来源或类型的特征分组存放,使结构清晰。例如,profile_features 是静态的,behavioral_features_streaming 是流式实时更新的,而 behavioral_features_batch 来自离线批处理作业。
  • 特征值与时间戳: 对于时效性强的特征(如 clicks_last_1h),我们不仅存储数值 value,还附带了计算该值时的时间戳 ts。这对于问题排查和特征监控非常有用。

2. 核心逻辑:用于特征工程的 Pulsar Function

这是整个管道的大脑。我们用 Python 来编写这个 Function,它消费 raw_events 主题,进行有状态的计算,然后将结果(特征更新)发送到 feature_updates 主题。

假设 raw_events 中是一条条的用户点击事件,格式如下:
{"event_type": "click", "user_id": "user-a1b2c3d4", "item_id": "item-xyz", "category_id": "cat_5", "timestamp": 1678886399000}

我们的 Pulsar Function 需要实现一个功能:计算每个用户在过去1小时内的点击次数。这需要用到状态存储。

# feature_transformer_function.py

import pulsar
from pulsar.schema import *
import json
import time
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class ClickEvent(Record):
    event_type = String()
    user_id = String()
    item_id = String()
    category_id = String()
    timestamp = Long()

class FeatureUpdate(Record):
    entity_id = String()
    feature_name = String()
    feature_value = Any(object) # Using Any for flexibility
    timestamp = Long()

class RealtimeFeatureTransformer(pulsar.Function):
    def __init__(self):
        # 状态存储的键前缀,避免不同特征的状态混淆
        self.state_prefix = "clicks_last_1h::"
        self.window_duration_sec = 3600  # 1 hour

    def process(self, input_event: bytes, context: pulsar.Context):
        """
        主处理逻辑。
        1. 解析输入事件。
        2. 基于事件更新状态存储。
        3. 生成并发送特征更新事件。
        """
        try:
            event_data = json.loads(input_event.decode('utf-8'))
            user_id = event_data.get("user_id")

            if not user_id or event_data.get("event_type") != "click":
                # 不是我们关心的事件,直接确认并忽略
                context.ack(context.get_message())
                return

            # 使用 Pulsar Function 的内建状态存储
            # 状态是一个列表,存储了最近一小时内的点击时间戳
            state_key = f"{self.state_prefix}{user_id}"
            
            # 1. 获取当前状态
            raw_state = context.get_state(state_key)
            if raw_state is None:
                click_timestamps = []
            else:
                click_timestamps = json.loads(raw_state.decode('utf-8'))

            # 2. 更新状态
            current_ts_sec = int(time.time())
            event_ts_sec = event_data.get("timestamp", current_ts_sec * 1000) // 1000
            
            # 添加新事件的时间戳
            click_timestamps.append(event_ts_sec)

            # 清理过期的时间戳,这是一个常见的滑动窗口实现方式
            cutoff_ts = current_ts_sec - self.window_duration_sec
            updated_timestamps = [ts for ts in click_timestamps if ts > cutoff_ts]
            
            # 将更新后的状态写回
            context.put_state(state_key, json.dumps(updated_timestamps).encode('utf-8'))
            
            # 3. 生成特征更新事件
            new_feature_value = len(updated_timestamps)
            
            # 为避免过于频繁的更新,可以增加一个判断逻辑:仅当值变化时才发送
            # last_value_key = f"last_value::{state_key}"
            # last_sent_value = context.get_state(last_value_key)
            # if last_sent_value is None or int(last_sent_value.decode('utf-8')) != new_feature_value:
            
            update_event = {
                "entity_id": user_id,
                "feature_name": "behavioral_features_streaming.clicks_last_1h",
                "feature_value": {
                    "value": new_feature_value,
                    "ts": int(time.time() * 1000)
                }
            }
            
            output_topic = context.get_function_config().output
            context.publish(output_topic, json.dumps(update_event).encode('utf-8'))
            # context.put_state(last_value_key, str(new_feature_value).encode('utf-8'))

            logger.info(f"Processed click for {user_id}. New 1h click count: {new_feature_value}")

        except Exception as e:
            logger.error(f"Failed to process message: {input_event}. Error: {e}", exc_info=True)
            # 触发 Pulsar 的自动重试机制
            context.reconsume_later(context.get_message(), 5000) # 5秒后重试
        finally:
            # 确保在所有情况下都 ack 消息,除非我们想重试
            # 在上面的 reconsume_later 分支中,消息不会被 ack
            # 在成功分支中,我们应该在这里 ack
            if 'e' not in locals():
                context.ack(context.get_message())

部署这个 Function:

pulsar-admin functions create \
  --py /path/to/feature_transformer_function.py \
  --classname feature_transformer_function.RealtimeFeatureTransformer \
  --inputs persistent://public/default/raw_events \
  --output persistent://public/default/feature_updates \
  --name realtime-feature-transformer \
  --tenant public \
  --namespace default \
  --parallelism 4 \
  --processing-guarantees EFFECTIVELY_ONCE

关键实现细节:

  • 状态管理: 我们利用 context.get_statecontext.put_state 来维护每个用户的滑动时间窗口。Pulsar 在底层使用 BookKeeper 来持久化这些状态,保证了故障恢复。
  • 错误处理: try...except 块是生产级代码的标配。当处理失败时,我们调用 context.reconsume_later,这会将消息放回队列,并延迟一段时间后再次投递,避免了“毒丸消息”阻塞整个管道。
  • 输出格式: Function 的输出是一个结构化的 JSON,包含了实体ID、要更新的特征名(使用点表示法 . 来指定嵌套路径)和新的特征值。这种格式对下游的 Sink 非常友好。
  • 处理保证: EFFECTIVELY_ONCE 保证了即使 Function 实例失败重启,状态也能恢复,消息不会被重复处理或丢失,这对于计算准确性至关重要的特征(如交易金额)来说是必须的。

3. 数据写入:配置 Pulsar MongoDB Sink

Pulsar 生态提供了丰富的 Connector。我们直接使用官方的 MongoDB Sink Connector,它会监听 feature_updates 主题,并将数据写入 MongoDB。我们只需要提供一个配置文件。

mongo-sink.yaml:

tenant: public
namespace: default
name: feature-store-mongo-sink
inputs:
  - persistent://public/default/feature_updates
parallelism: 4
archive: pulsar-io-mongo-3.1.0.nar # Connector 包路径

configs:
  mongoUri: "mongodb://user:password@mongodb-host:27017/feature_store_db?authSource=admin"
  database: "feature_store_db"
  collection: "user_features"
  # 关键配置:Upsert 模式
  updateStrategy: "UPDATE"
  updateKey: "entity_id" # 从 feature_updates 消息中提取 'entity_id' 字段
  updateFields: "feature_name,feature_value" # 提取 'feature_name' 和 'feature_value'
  
  # 这里我们使用 MongoDB 的 $set 操作符来更新文档中特定的字段
  # 注意:这需要 connector 支持动态字段更新,如果不支持,可能需要自定义
  # 简单的 connector 会直接替换整个文档,而我们需要的是部分更新
  # 假设我们使用的 connector 版本支持动态字段更新,通过消息内容来指定更新路径
  # 在这里,我们假设 connector 智能地将 'feature_name' 的点表示法转换为 MongoDB 更新操作
  # 'behavioral_features_streaming.clicks_last_1h' -> {$set: {"features.behavioral_features_streaming.clicks_last_1h": ...}}
  
  processingGuarantees: ATLEAST_ONCE
  # 对于特征存储,ATLEAST_ONCE 通常足够,因为重复写入相同的值是幂等的。

部署 Sink Connector:

pulsar-admin sinks create --sink-config-file mongo-sink.yaml

这个 Sink 会把 {"entity_id": "user-a1b2c3d4", "feature_name": "...", "feature_value": ...} 这样的消息,智能地转换成对 MongoDB 的 updateOne 操作,并且使用 upsert=true。例如:
db.user_features.updateOne({_id: "user-a1b2c3d4"}, {$set: {"features.behavioral_features_streaming.clicks_last_1h": {"value": 16, "ts": ...}}}, {upsert: true})
这样就实现了对特征文档的原子化、增量式更新。

4. 特征服务:低延迟的特征获取接口

最后,模型推理服务需要一个接口来获取特征。我们用 FastAPI 构建一个简单的 Python 服务。

# feature_server.py

from fastapi import FastAPI, HTTPException
from pymongo import MongoClient
from pymongo.collection import Collection
import os
import logging
from contextlib import asynccontextmanager

# 配置
MONGO_URI = os.environ.get("MONGO_URI", "mongodb://user:password@mongodb-host:27017/")
MONGO_DB = "feature_store_db"
MONGO_COLLECTION = "user_features"

# 日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 全局资源
mongo_client = None
feature_collection = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 应用启动时
    global mongo_client, feature_collection
    try:
        mongo_client = MongoClient(MONGO_URI, maxPoolSize=50, minPoolSize=10, serverSelectionTimeoutMS=5000)
        # 强制连接以提前发现问题
        mongo_client.admin.command('ping')
        feature_collection = mongo_client[MONGO_DB][MONGO_COLLECTION]
        logger.info("Successfully connected to MongoDB.")
        yield
    except Exception as e:
        logger.error(f"Failed to connect to MongoDB: {e}")
        # 在实际项目中,这里应该导致服务启动失败
        yield
    finally:
        # 应用关闭时
        if mongo_client:
            mongo_client.close()
            logger.info("MongoDB connection closed.")


app = FastAPI(lifespan=lifespan)

@app.get("/v1/features/{entity_id}")
async def get_features(entity_id: str):
    """
    根据实体ID获取最新的特征向量。
    """
    if not feature_collection:
        raise HTTPException(status_code=503, detail="Feature store not available.")

    try:
        # 关键的查询操作,只返回我们需要的 `features` 字段以减少网络传输
        document = feature_collection.find_one(
            {"_id": entity_id},
            {"_id": 0, "features": 1} 
        )

        if not document:
            # 在真实场景中,对于新用户,我们可能返回一组默认特征,而不是404
            logger.warning(f"No features found for entity_id: {entity_id}")
            raise HTTPException(status_code=404, detail=f"Features not found for entity_id {entity_id}")
        
        # 将流式特征和批处理特征合并成一个扁平的字典,方便模型使用
        feature_vector = {}
        features_data = document.get("features", {})
        
        # 扁平化处理
        for group, feature_dict in features_data.items():
            for feature_name, value in feature_dict.items():
                # 如果特征值是包含 value 和 ts 的对象,只取 value
                if isinstance(value, dict) and 'value' in value:
                    feature_vector[feature_name] = value['value']
                else:
                    feature_vector[feature_name] = value
        
        return feature_vector

    except Exception as e:
        logger.error(f"Error fetching features for {entity_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error while fetching features.")

这个服务非常轻量,它只做一件事:根据 entity_id 从 MongoDB 查询文档,并做一些简单的格式化。所有的重计算都已经在上游的 Pulsar Function 中完成了。通过合理的连接池配置和只查询必要字段的优化,这个接口的 P99 延迟可以轻松控制在 10ms 以内。

遗留问题与未来迭代

这套基于 Pulsar 和 MongoDB 的实时特征管道解决了我们最紧迫的延迟和一致性问题,但它并非银弹。

一个显而易见的问题是特征回填(Backfilling)。当一个新模型需要一个全新的特征(例如“过去90天购买总额”),我们需要对历史数据进行一次性计算。虽然 Pulsar 的分层存储让我们可以重放历史事件,但对于TB级别的历史数据,通过 Pulsar Function 逐条重放效率太低。更现实的方案是启动一个并行的 Spark 作业,直接读取对象存储上的原始事件数据,批量计算后,通过 Spark Connector 直接写入 MongoDB。这就形成了流批一体的特征计算架构。

另一个局限性在于特征治理。现在,特征的定义、来源、负责人等元信息都散落在代码和文档里。当特征数量增长到成百上千时,会导致严重的混乱和重复建设。下一步的演进方向是引入一个特征注册中心(Feature Registry),比如开源的 Feast 或者自研的元数据系统。所有特征的定义都在此统一管理、版本化,并与底层的实现(Pulsar Function 或 Spark Job)和存储(MongoDB中的路径)关联起来。这才是 MLOps 体系走向成熟的关键一步。


  目录