Node.js 与 Hadoop 异构计算任务中 Jaeger 链路上下文的传递实践


我们面临的第一个问题,不是性能,而是黑盒。一个由Node.js编写的API网关负责接收数据处理请求,然后将任务分发给后端的Hadoop集群。当用户抱怨某个任务处理缓慢或失败时,我们的排查过程极其痛苦。Node.js服务的日志里只有一个请求ID,而Hadoop集群的YARN日志里则充斥着成千上万个MapReduce任务的记录。将这两者关联起来,无异于大海捞针。

最初的尝试是手动在Node.js日志和Hadoop任务参数中注入一个唯一的jobId。这解决了“有”和“无”的问题,但它仍然是反应式的。我们需要一种能将整个生命周期——从API接收请求,到消息入队,再到Java应用启动Hadoop任务,最后到任务完成——串联成一个完整视图的工具。这正是分布式链路追踪的用武之地。

初步构想与技术栈的再确认

我们的架构基本固定:Node.js作为BFF(Backend for Frontend)层,它轻量、高并发的特性非常适合处理API请求和与消息队列、缓存交互。Hadoop作为离线处理的核心,承载着历史遗留的、但依然稳定运行的MapReduce作业。我们需要在这个异构(JavaScript vs. Java)、异步(通过消息队列解耦)的体系中,建立起统一的可观测性。

技术选型决策如下:

  1. 链路追踪系统: Jaeger。它基于OpenTelemetry标准,生态成熟,社区活跃。
  2. 上下文传递协议: W3C Trace Context。这是行业标准,能确保不同语言、不同框架的SDK之间可以互操作。
  3. 任务状态与结果存储: 键值型NoSQL数据库,我们选择了Redis。它的高速读写能力非常适合存储任务的瞬时状态(如 PENDING, RUNNING, FAILED, COMPLETED)。
  4. Node.js服务测试: Vitest。其现代化的API、开箱即用的Mock能力以及极快的运行速度,使其成为测试这种包含大量外部依赖(消息队列、数据库)的服务的理想选择。

核心挑战在于,如何让Node.js服务产生的Trace Context,跨越消息队列的鸿沟,被一个完全独立的Java进程正确识别并继承。

步骤化实现:构建可追踪的异步任务管道

1. Node.js 服务端的 OpenTelemetry 配置

首先,我们需要在Node.js服务中集成OpenTelemetry SDK,并配置它将追踪数据导出到Jaeger。

tracer.js:

// tracer.js
// 生产环境中,这个文件应该在应用启动时通过 `node -r ./tracer.js app.js` 预加载

import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http';
import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express';
import { AmqplibInstrumentation } from '@opentelemetry/instrumentation-amqplib';

const provider = new NodeTracerProvider({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'hadoop-job-submitter',
  }),
});

// 配置Jaeger Exporter
// 确保Jaeger Collector的OTLP HTTP接收器正在运行,默认端口4318
const exporter = new OTLPTraceExporter({
  url: process.env.JAEGER_OTLP_ENDPOINT || 'http://localhost:4318/v1/traces',
});

provider.addSpanProcessor(new SimpleSpanProcessor(exporter));

provider.register();

// 自动对常用库进行插桩
registerInstrumentations({
  instrumentations: [
    new HttpInstrumentation(),
    new ExpressInstrumentation(),
    // AmqplibInstrumentation是关键,它会自动在消息头中注入和提取Trace Context
    new AmqplibInstrumentation(),
  ],
});

console.log('OpenTelemetry tracing initialized for hadoop-job-submitter');

这份配置定义了服务名称,并指定了数据上报的Jaeger地址。最关键的是AmqplibInstrumentation,它会自动处理与RabbitMQ交互时的上下文注入。

2. 任务提交API的实现

接下来是Express应用的具体实现。API接收一个POST请求,生成一个唯一的任务ID,将任务信息和Trace Context一同发布到RabbitMQ,并在Redis中记录初始状态。

app.js:

import express from 'express';
import { randomUUID } from 'crypto';
import amqp from 'amqplib';
import { createClient } from 'redis';
import opentelemetry from '@opentelemetry/api';

const app = express();
app.use(express.json());

const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://localhost';
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
const JOB_QUEUE = 'hadoop-jobs';

let channel;
let redisClient;

async function connect() {
  try {
    const connection = await amqp.connect(RABBITMQ_URL);
    channel = await connection.createChannel();
    await channel.assertQueue(JOB_QUEUE, { durable: true });
    
    redisClient = createClient({ url: REDIS_URL });
    await redisClient.connect();

    console.log('Connected to RabbitMQ and Redis');
  } catch (error) {
    console.error('Failed to connect to RabbitMQ or Redis', error);
    process.exit(1);
  }
}

// 获取当前的Tracer实例
const tracer = opentelemetry.trace.getTracer('hadoop-job-submitter-tracer');

app.post('/jobs', async (req, res) => {
  const { datasetPath, outputDir } = req.body;
  if (!datasetPath || !outputDir) {
    return res.status(400).json({ error: 'datasetPath and outputDir are required' });
  }

  const jobId = randomUUID();

  // 这里的操作都在由ExpressInstrumentation创建的根Span下
  // 我们也可以手动创建一个子Span来更精细地追踪
  const span = tracer.startSpan('prepare-and-queue-job');
  
  try {
    // 使用当前激活的上下文
    opentelemetry.context.with(opentelemetry.trace.setSpan(opentelemetry.context.active(), span), async () => {
      span.setAttribute('job.id', jobId);
      span.setAttribute('job.dataset_path', datasetPath);

      const jobPayload = {
        jobId,
        datasetPath,
        outputDir,
        submittedAt: new Date().toISOString(),
      };

      // OpenTelemetry的amqplib instrumentation会自动在这里注入traceparent和tracestate头
      channel.sendToQueue(JOB_QUEUE, Buffer.from(JSON.stringify(jobPayload)), {
        persistent: true,
      });
      span.addEvent('Job message published to RabbitMQ');
      
      // 在Redis中设置任务初始状态,设置1小时过期
      await redisClient.set(`job:${jobId}:status`, 'PENDING', { EX: 3600 });
      span.addEvent('Job status set to PENDING in Redis');
    
      res.status(202).json({ jobId, status: 'PENDING' });
    });
  } catch (error) {
    span.recordException(error);
    span.setStatus({ code: opentelemetry.api.SpanStatusCode.ERROR });
    res.status(500).json({ error: 'Failed to submit job' });
  } finally {
    span.end();
  }
});

app.get('/jobs/:jobId/status', async (req, res) => {
    const { jobId } = req.params;
    const status = await redisClient.get(`job:${jobId}:status`);
    if (!status) {
        return res.status(404).json({ error: 'Job not found' });
    }
    res.status(200).json({ jobId, status });
});


connect().then(() => {
  app.listen(3000, () => {
    console.log('Server is running on port 3000');
  });
});

export default app; // For Vitest testing

注意,AmqplibInstrumentation会自动从当前活动的Span中提取上下文,并将其序列化为W3C Trace Context格式,注入到消息的headers中。我们不需要手动操作。

3. Java端的Hadoop任务启动器

这是连接两个世界的桥梁。我们编写一个Java程序,它作为常驻进程消费RabbitMQ队列。当收到消息时,它必须:

  1. 从消息头中提取W3C Trace Context。
  2. 使用OpenTelemetry SDK for Java,将提取的上下文作为父级,创建一个新的Span。
  3. 执行hadoop jar ...命令。
  4. 在任务开始和结束时,更新Redis中的任务状态。

为了简化,这里使用一个Maven项目。pom.xml需要包含以下依赖:

<dependencies>
    <!-- OpenTelemetry -->
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-api</artifactId>
        <version>1.31.0</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk</artifactId>
        <version>1.31.0</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-otlp</artifactId>
        <version>1.31.0</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-context</artifactId>
        <version>1.31.0</version>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-semconv</artifactId>
        <version>1.31.0-alpha</version>
    </dependency>
    
    <!-- RabbitMQ Client -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.17.0</version>
    </dependency>

    <!-- Redis Client -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>4.4.3</version>
    </dependency>

    <!-- JSON Parsing -->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.10.1</version>
    </dependency>
</dependencies>

HadoopJobConsumer.java:

import com.google.gson.Gson;
import com.rabbitmq.client.*;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcTraceExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import redis.clients.jedis.Jedis;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class HadoopJobConsumer {

    private static final String JOB_QUEUE = "hadoop-jobs";
    private static final Gson gson = new Gson();
    private static Tracer tracer;
    private static OpenTelemetry openTelemetry;
    private static Jedis jedis;

    // 自定义TextMapGetter,用于从RabbitMQ消息头中提取上下文
    private static final TextMapGetter<AMQP.BasicProperties> getter =
        new TextMapGetter<>() {
            @Override
            public Iterable<String> keys(AMQP.BasicProperties carrier) {
                Map<String, Object> headers = carrier.getHeaders();
                return headers != null ? headers.keySet() : java.util.Collections.emptyList();
            }

            @Override
            public String get(AMQP.BasicProperties carrier, String key) {
                Map<String, Object> headers = carrier.getHeaders();
                if (headers != null && headers.containsKey(key)) {
                    return headers.get(key).toString();
                }
                return null;
            }
        };

    private static OpenTelemetry initOpenTelemetry() {
        Resource resource = Resource.getDefault().merge(
            Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "hadoop-job-runner")));

        SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
            .addSpanProcessor(BatchSpanProcessor.builder(
                OtlpGrpcTraceExporter.builder()
                    .setEndpoint(System.getenv("JAEGER_OTLP_ENDPOINT_GRPC") != null ? System.getenv("JAEGER_OTLP_ENDPOINT_GRPC") : "http://localhost:4317")
                    .build())
                .build())
            .setResource(resource)
            .build();
        
        return OpenTelemetrySdk.builder()
            .setTracerProvider(sdkTracerProvider)
            .setPropagators(io.opentelemetry.context.propagation.ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
            .buildAndRegisterGlobal();
    }
    
    public static void main(String[] args) throws Exception {
        openTelemetry = initOpenTelemetry();
        tracer = openTelemetry.getTracer("io.opentelemetry.example.HadoopJobConsumer");
        jedis = new Jedis("localhost", 6379);

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(JOB_QUEUE, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            AMQP.BasicProperties props = delivery.getProperties();
            
            // 关键步骤:提取父上下文
            Context extractedContext = openTelemetry.getPropagators().getTextMapPropagator()
                .extract(Context.current(), props, getter);

            try (Scope scope = extractedContext.makeCurrent()) {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                JobPayload payload = gson.fromJson(message, JobPayload.class);
                
                // 创建一个子Span,它会自动关联到从消息中提取的父Span
                Span span = tracer.spanBuilder("process-hadoop-job")
                    .setSpanKind(SpanKind.CONSUMER)
                    .startSpan();
                
                try (Scope spanScope = span.makeCurrent()) {
                    span.setAttribute("job.id", payload.jobId);
                    span.setAttribute("job.dataset_path", payload.datasetPath);
                    System.out.println(" [x] Received job: " + payload.jobId);

                    updateJobStatus(payload.jobId, "RUNNING");
                    span.addEvent("Job status updated to RUNNING");
                    
                    // 模拟执行Hadoop作业
                    executeHadoopJob(payload, span);

                    updateJobStatus(payload.jobId, "COMPLETED");
                    span.addEvent("Job status updated to COMPLETED");
                    span.setStatus(StatusCode.OK);

                } catch (Exception e) {
                    span.setStatus(StatusCode.ERROR, "Hadoop job failed");
                    span.recordException(e);
                    updateJobStatus(payload.jobId, "FAILED");
                } finally {
                    span.end();
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(JOB_QUEUE, false, deliverCallback, consumerTag -> {});
    }

    private static void updateJobStatus(String jobId, String status) {
        jedis.setex("job:" + jobId + ":status", 3600, status);
    }
    
    private static void executeHadoopJob(JobPayload payload, Span parentSpan) throws Exception {
        // 在真实项目中,这里会是调用Hadoop Jar的逻辑
        // ProcessBuilder pb = new ProcessBuilder("hadoop", "jar", "my-job.jar", payload.datasetPath, payload.outputDir);
        // ...
        
        // 为了演示,我们只模拟这个过程并创建子Span
        Span hadoopSpan = tracer.spanBuilder("execute-hadoop-process")
                .setParent(Context.current().with(parentSpan))
                .startSpan();
        
        try (Scope scope = hadoopSpan.makeCurrent()) {
            hadoopSpan.addEvent("Hadoop process starting");
            System.out.println("Executing Hadoop job for: " + payload.jobId);
            Thread.sleep(5000); // 模拟耗时
            hadoopSpan.addEvent("Hadoop process finished");
        } finally {
            hadoopSpan.end();
        }
    }

    static class JobPayload {
        String jobId;
        String datasetPath;
        String outputDir;
    }
}

这个Java程序是整个链路得以延续的核心。getter的实现和openTelemetry.getPropagators().getTextMapPropagator().extract(...)是魔法发生的地方。它解析了Node.js服务注入的traceparent头,并在Java世界里重建了追踪上下文。

4. 使用Vitest测试Node.js服务的健壮性

一个只负责“发射”消息的服务,如果不做充分测试,就如同一个黑盒。我们需要确保它在各种情况下都能正确地与外部依赖(Redis, RabbitMQ)交互。Vitest的vi.mock功能在这里大放异彩。

app.test.js:

import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import request from 'supertest';
import app from './app'; // 假设app.js导出了express实例

// Mock外部依赖
const mockSendToQueue = vi.fn();
const mockSet = vi.fn();

vi.mock('amqplib', () => ({
  default: {
    connect: vi.fn().mockResolvedValue({
      createChannel: vi.fn().mockResolvedValue({
        assertQueue: vi.fn(),
        sendToQueue: mockSendToQueue,
      }),
    }),
  },
}));

vi.mock('redis', () => ({
  createClient: vi.fn(() => ({
    connect: vi.fn().mockResolvedValue(true),
    set: mockSet,
    get: vi.fn(),
  })),
}));

// Mock OpenTelemetry,避免在测试中产生真实的trace
vi.mock('@opentelemetry/api', async () => {
    const original = await vi.importActual('@opentelemetry/api');
    const noopTracer = {
        startSpan: () => ({
            setAttribute: vi.fn(),
            addEvent: vi.fn(),
            recordException: vi.fn(),
            setStatus: vi.fn(),
            end: vi.fn(),
            isRecording: () => true,
            spanContext: () => ({ traceId: 'mock-trace-id', spanId: 'mock-span-id' }),
        }),
        getTracer: () => this,
    };
    return {
        ...original,
        default: {
            ...original.default,
            trace: {
                getTracer: () => noopTracer,
                setSpan: (ctx, span) => ctx,
            },
            context: {
                active: () => original.context.active(),
                with: (ctx, fn) => fn(),
            }
        },
    };
});

describe('Hadoop Job Submitter API', () => {

  beforeEach(() => {
    // 在每个测试用例开始前重置mock函数的调用记录
    mockSendToQueue.mockClear();
    mockSet.mockClear();
  });

  it('POST /jobs should submit a job successfully', async () => {
    const jobData = {
      datasetPath: '/data/input',
      outputDir: '/data/output',
    };

    const response = await request(app).post('/jobs').send(jobData);

    expect(response.status).toBe(202);
    expect(response.body).toHaveProperty('jobId');
    expect(response.body.status).toBe('PENDING');

    // 1. 验证RabbitMQ交互
    expect(mockSendToQueue).toHaveBeenCalledOnce();
    const [queue, messageBuffer, options] = mockSendToQueue.mock.calls[0];
    expect(queue).toBe('hadoop-jobs');
    const message = JSON.parse(messageBuffer.toString());
    expect(message.datasetPath).toBe(jobData.datasetPath);
    expect(options.persistent).toBe(true);
    
    // 2. 验证Redis交互
    expect(mockSet).toHaveBeenCalledOnce();
    const [key, value, expiry] = mockSet.mock.calls[0];
    expect(key).toBe(`job:${response.body.jobId}:status`);
    expect(value).toBe('PENDING');
    expect(expiry).toEqual({ EX: 3600 });
  });

  it('POST /jobs should return 400 if payload is invalid', async () => {
    const response = await request(app).post('/jobs').send({ datasetPath: '/data/input' });
    expect(response.status).toBe(400);
    expect(mockSendToQueue).not.toHaveBeenCalled();
    expect(mockSet).not.toHaveBeenCalled();
  });
});

这个测试用例覆盖了核心成功路径和失败路径。它验证了服务是否正确地调用了外部系统的API,并传递了正确的参数,而无需在测试环境中启动一个真正的RabbitMQ或Redis实例。这是确保代码逻辑正确性的关键一步。

最终成果的可视化

当所有组件都运行起来后,在Jaeger UI中搜索hadoop-job-submitter服务,我们会看到一条完整的链路:

sequenceDiagram
    participant Client
    participant Node.js API
    participant RabbitMQ
    participant Java Consumer
    participant Hadoop Cluster
    participant Redis

    Client->>+Node.js API: POST /jobs
    Node.js API->>Node.js API: Create Span A (submit-job)
    Node.js API->>RabbitMQ: Publish message (with TraceContext of A)
    Node.js API->>+Redis: SET job:xyz:status PENDING
    Redis-->>-Node.js API: OK
    Node.js API-->>-Client: 202 Accepted {jobId: "xyz"}

    Java Consumer->>+RabbitMQ: Consume message
    RabbitMQ-->>-Java Consumer: Message (with TraceContext of A)
    Java Consumer->>Java Consumer: Create Span B (process-hadoop-job), child of A
    Java Consumer->>+Redis: SET job:xyz:status RUNNING
    Redis-->>-Java Consumer: OK
    Java Consumer->>+Hadoop Cluster: Execute MapReduce Job
    Hadoop Cluster-->>-Java Consumer: Job Finished
    Java Consumer->>+Redis: SET job:xyz:status COMPLETED
    Redis-->>-Java Consumer: OK

在Jaeger中,这条Trace会显示为两个串联的Span。第一个Span是POST /jobs,由Node.js服务产生。第二个Span是process-hadoop-job,由Java消费者产生,并且被正确地识别为第一个Span的子Span。点击这条Trace,我们能清晰地看到整个流程的耗时分布,从API请求到Hadoop任务执行完毕,一目了然。

当前方案的局限性与未来展望

这个方案成功地解决了跨语言、异步调用的链路追踪问题,但它并非完美。

首先,我们的追踪粒度止于hadoop jar命令的执行。Span execute-hadoop-process仅仅记录了子进程的存活时间,我们无法深入到MapReduce作业内部,观察每个Mapper或Reducer的性能。要实现这一点,需要在Hadoop的TaskTracker/NodeManager层面进行更深度的OpenTelemetry插桩,这在现有Hadoop生态中并不常见,需要大量的定制开发。

其次,我们的Java消费者是一个简单的单体应用。在生产环境中,它需要考虑高可用、水平扩展和优雅停机。一个更健壮的方案可能是使用像Apache Flink或Spark Streaming这样的流处理框架来消费RabbitMQ的消息,这些框架自带了更好的容错和状态管理机制。

最后,我们只传递了traceparent。OpenTelemetry的baggage功能允许我们传递业务相关的元数据,比如userIdtenantId。在未来的迭代中,我们可以利用baggage将这些业务标识符从Node.js一直传递到Java端,从而在日志和追踪数据中实现更丰富的业务维度查询。


  目录