基于 ArangoDB 与 Elasticsearch 构建统一的开发者平台元数据与可观测性 API


随着内部微服务数量突破三百个,团队间的技术债和信息孤岛问题开始严重拖累研发效能。服务元数据散落在 Confluence、Git 仓库的 README 和各个团队的私有文档中,服务间的依赖关系仅存在于资深工程师的脑子里。定位一次线上故障,需要在 Kibana、Grafana、ArgoCD 等多个平台之间来回跳转,上下文的丢失让问题排查效率极低。我们需要一个统一的内部开发者平台 (IDP) 来解决这个问题,而其核心,是一个能够提供服务全景视图的元数据与可观测性 API。

最初的方案是传统的:使用 PostgreSQL 存储服务的静态元数据(负责人、仓库地址、所属团队),使用 Elasticsearch 聚合所有服务的日志。这套方案技术栈成熟,团队熟悉度高。但很快我们发现了它的根本缺陷:关系型数据库难以优雅地处理微服务架构中复杂、动态且多维度的“关系”。一个服务“依赖”另一个服务,“部署在”某个环境,“属于”某个业务域,这些关系在 JOIN 查询和不断演进的 Schema 面前显得异常脆弱。尤其是要进行“查询某个服务下游三层内所有受影响的服务”这类深度遍历查询时,SQL 的表现力与性能都会成为瓶颈。

我们需要的不是一个简单的服务目录,而是一个能够描述整个研发“宇宙”的知识图谱。因此,我们放弃了传统方案,转向了一个更激进的组合:使用 ArangoDB 作为元数据核心,利用其多模型特性同时存储服务的文档数据和它们之间的图关系;继续使用 Elasticsearch 作为海量日志和事件数据的检索引擎。所有数据通过一个 Go 编写的后端服务统一暴露,并由 Tyk API 网关进行安全和流量管控,最终服务于一个基于 SSG 和 esbuild 构建的高性能前端门户。

架构决策:为何选择 ArangoDB 与 Elasticsearch 组合

  • 方案 A: PostgreSQL + Elasticsearch

    • 优势:
      • 技术成熟,团队经验丰富。
      • 关系模型对结构化数据提供强一致性保证。
      • 生态系统完善,工具链齐全。
    • 劣势:
      • 服务间的依赖、归属、部署关系是典型的图结构数据,使用关系模型表达非常笨拙,通常需要设计复杂的连接表,多层依赖查询性能低下。
      • Schema 变更成本高,难以适应快速演进的元数据模型。
  • 方案 B: ArangoDB + Elasticsearch

    • 优势:
      • 多模型统一: ArangoDB 原生支持文档、图和键值模型。服务的基础信息(如名称、代码仓库)可以作为文档存储,而服务间的依赖关系(depends_on)、部署关系(deployed_to)可以作为图的边(Edge)存储。这使得模型非常直观且易于扩展。
      • 强大的图查询能力: ArangoDB 的查询语言 AQL (ArangoDB Query Language) 能够轻松执行任意深度的图遍历、最短路径查找等复杂查询,这对于分析服务影响范围、生成架构拓扑图至关重要。
      • 关注点分离: ArangoDB 专注于处理结构化、关系复杂的元数据,而 Elasticsearch 则发挥其专长,处理半结构化、高时效性、大容量的可观测性数据(日志、事件)。两者各司其职。
    • 劣势:
      • 引入新的数据库技术栈,团队需要学习成本。
      • 维护两个分布式存储系统,运维复杂度相对更高。

最终,我们选择了方案 B。我们判断,能够精确、高效地建模和查询服务间的复杂关系,是构建 IDP 核心价值的关键。这种能力带来的长期收益,远超短期内的学习成本和运维投入。

核心架构图

整个系统的交互流程可以简化为下图:

graph TD
    subgraph "开发者门户 (SSG + esbuild)"
        A[Portal Frontend]
    end

    subgraph "API 网关 (Tyk)"
        B[Tyk Gateway]
    end

    subgraph "核心 API 服务 (Go)"
        C[Metadata & Observability API]
    end

    subgraph "数据存储层"
        D[ArangoDB Cluster]
        E[Elasticsearch Cluster]
    end

    subgraph "数据采集"
        F[CI/CD Pipeline]
        G[Log Agents]
    end

    A -->|REST/GraphQL Calls| B
    B -->|Proxied Requests| C
    C -->|AQL Queries| D
    C -->|Search Queries| E
    F -->|Update Metadata| C
    G -->|Ship Logs| E

    style D fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#f9f,stroke:#333,stroke-width:2px
  • 数据源: CI/CD 流水线在每次构建或部署后,会调用核心 API 更新 ArangoDB 中的服务元数据和部署关系。日志代理(如 Fluentd)将应用日志统一推送到 Elasticsearch。
  • 核心服务: Go 服务是唯一的数据出口,它封装了对 ArangoDB 和 Elasticsearch 的所有查询逻辑。
  • 网关层: Tyk 负责鉴权、限流、请求转换等,确保核心服务的稳定与安全。
  • 消费端: 前端门户是一个静态站点,通过 API 获取所有动态数据,esbuild 保证了其极快的构建速度。

ArangoDB 数据模型设计与实现

我们的核心模型包含两种文档集合(services, environments)和两种边集合(dependencies, deployments)。

  • services (Document Collection): 存储微服务的基础信息。

    {
        "_key": "user-service",
        "name": "User Service",
        "repo_url": "[email protected]:our-org/user-service.git",
        "language": "go",
        "owner_team": "team-alpha"
    }
  • environments (Document Collection): 存储部署环境的信息。

    {
        "_key": "production",
        "name": "Production",
        "cluster_name": "eks-prod-us-east-1"
    }
  • dependencies (Edge Collection): 描述服务间的依赖关系,从一个 services 指向另一个 services

    {
        "_from": "services/order-service",
        "_to": "services/user-service",
        "type": "sync_http", // e.g., sync_http, async_kafka
        "critical": true
    }
  • deployments (Edge Collection): 描述服务在某个环境的部署实例。

    {
        "_from": "services/user-service",
        "_to": "environments/production",
        "version": "v1.2.3",
        "deployed_at": "2023-10-27T10:00:00Z",
        "commit_sha": "a1b2c3d4"
    }

查询一个服务及其所有下游依赖

这是图数据库真正发光的场景。我们需要找出 order-service 的所有下游依赖(它直接或间接依赖的所有服务)。

使用 AQL 可以非常简洁地实现:

/*
 * Find all downstream dependencies for a given service.
 * @param startNode - The _id of the starting service, e.g., "services/order-service"
 */
FOR v, e, p IN 1..10 OUTBOUND @startNode dependencies
  OPTIONS { uniqueVertices: "global", bfs: true }
  RETURN {
    service: {
      _key: v._key,
      name: v.name,
      repo_url: v.repo_url
    },
    dependency_path: {
      // p.edges contains all edges in the path
      // p.vertices contains all vertices in the path
      // Here we just show the type of dependency on the direct edge
      type: e.type,
      depth: LENGTH(p.edges)
    }
  }

这段 AQL 从 @startNode (例如 services/order-service) 开始,沿着 dependencies 边的方向 (OUTBOUND) 进行广度优先遍历 (bfs: true),最深探索10层。uniqueVertices: "global" 确保每个服务在结果中只出现一次。这种查询在 SQL 中需要通过递归 CTE 实现,通常更复杂且性能较差。

Go API 服务核心实现

我们的 Go 服务使用官方的 ArangoDB驱动 和 Elasticsearch 驱动。以下是获取服务详情及其依赖图谱的核心逻辑。

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"time"

	driver "github.com/arangodb/go-driver"
	"github.com/arangodb/go-driver/http"
	"github.com/elastic/go-elasticsearch/v8"
	"github.com/gorilla/mux"
)

// Service represents the basic structure of a service document.
type Service struct {
	Key       string `json:"_key,omitempty"`
	Name      string `json:"name,omitempty"`
	RepoURL   string `json:"repo_url,omitempty"`
	Language  string `json:"language,omitempty"`
	OwnerTeam string `json:"owner_team,omitempty"`
}

// Dependency represents a downstream service dependency.
type Dependency struct {
	Service        Service `json:"service"`
	DependencyPath struct {
		Type  string `json:"type"`
		Depth int    `json:"depth"`
	} `json:"dependency_path"`
}

// FullServiceView combines metadata, dependencies, and recent logs.
type FullServiceView struct {
	Metadata     Service      `json:"metadata"`
	Dependencies []Dependency `json:"dependencies"`
	RecentLogs   []string     `json:"recent_logs"`
}

var db driver.Database
var esClient *elasticsearch.Client

// getServiceDetailHandler handles fetching the complete view for a service.
func getServiceDetailHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	serviceKey := vars["key"]
	if serviceKey == "" {
		http.Error(w, "Service key is required", http.StatusBadRequest)
		return
	}

	ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
	defer cancel()

	// 1. Fetch service metadata from ArangoDB
	var metadata Service
	servicesCol, err := db.Collection(ctx, "services")
	if err != nil {
		log.Printf("ERROR: Failed to get services collection: %v", err)
		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
		return
	}
	meta, err := servicesCol.ReadDocument(ctx, serviceKey, &metadata)
	if err != nil {
		if driver.IsNotFound(err) {
			http.NotFound(w, r)
			return
		}
		log.Printf("ERROR: Failed to read document %s: %v", serviceKey, err)
		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
		return
	}
	metadata.Key = meta.Key // Ensure key is populated

	// 2. Fetch dependencies using AQL graph traversal
	dependencyQuery := `
		FOR v, e, p IN 1..10 OUTBOUND @startNode dependencies
		  OPTIONS { uniqueVertices: "global", bfs: true }
		  RETURN {
			service: { _key: v._key, name: v.name, repo_url: v.repo_url },
			dependency_path: { type: e.type, depth: LENGTH(p.edges) }
		  }`
	bindVars := map[string]interface{}{
		"startNode": fmt.Sprintf("services/%s", serviceKey),
	}

	cursor, err := db.Query(ctx, dependencyQuery, bindVars)
	if err != nil {
		log.Printf("ERROR: AQL query failed: %v", err)
		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
		return
	}
	defer cursor.Close()

	var dependencies []Dependency
	for cursor.HasMore() {
		var dep Dependency
		_, err := cursor.ReadDocument(ctx, &dep)
		if err != nil {
			log.Printf("WARN: Failed to read dependency document: %v", err)
			continue // Skip problematic documents but don't fail the whole request
		}
		dependencies = append(dependencies, dep)
	}

	// 3. Fetch recent logs from Elasticsearch
	// In a real project, this query would be more complex, using bool queries
	// to filter by environment, time range, etc.
	res, err := esClient.Search(
		esClient.Search.WithContext(ctx),
		esClient.Search.WithIndex("logs-prod-*"),
		esClient.Search.WithQuery(fmt.Sprintf(`{"match":{"service.name":"%s"}}`, serviceKey)),
		esClient.Search.WithSize(20),
		esClient.Search.WithSort("timestamp:desc"),
	)
	if err != nil {
		log.Printf("ERROR: Elasticsearch query failed: %v", err)
		// We can still return partial data even if logs fail
	}
	defer res.Body.Close()

	var recentLogs []string
	if res.IsError() {
		log.Printf("WARN: Elasticsearch returned an error: %s", res.Status())
	} else {
		var r map[string]interface{}
		if err := json.NewDecoder(res.Body).Decode(&r); err == nil {
			hits := r["hits"].(map[string]interface{})["hits"].([]interface{})
			for _, hit := range hits {
				source := hit.(map[string]interface{})["_source"].(map[string]interface{})
				if msg, ok := source["message"].(string); ok {
					recentLogs = append(recentLogs, msg)
				}
			}
		}
	}

	// 4. Assemble and return the full view
	fullView := FullServiceView{
		Metadata:     metadata,
		Dependencies: dependencies,
		RecentLogs:   recentLogs,
	}

	w.Header().Set("Content-Type", "application/json")
	if err := json.NewEncoder(w).Encode(fullView); err != nil {
		log.Printf("ERROR: Failed to encode response: %v", err)
	}
}

// A real application would have better configuration management
func main() {
	// ArangoDB Connection
	conn, err := http.NewConnection(http.ConnectionConfig{
		Endpoints: []string{os.Getenv("ARANGO_ENDPOINT")},
	})
	if err != nil {
		log.Fatalf("Failed to create ArangoDB connection: %v", err)
	}
	client, err := driver.NewClient(driver.ClientConfig{
		Connection:     conn,
		Authentication: driver.BasicAuthentication(os.Getenv("ARANGO_USER"), os.Getenv("ARANGO_PASSWORD")),
	})
	if err != nil {
		log.Fatalf("Failed to create ArangoDB client: %v", err)
	}
	db, err = client.Database(context.Background(), os.Getenv("ARANGO_DATABASE"))
	if err != nil {
		log.Fatalf("Failed to open database: %v", err)
	}

	// Elasticsearch Connection
	esClient, err = elasticsearch.NewClient(elasticsearch.Config{
		Addresses: []string{os.Getenv("ES_ADDRESS")},
	})
	if err != nil {
		log.Fatalf("Error creating the Elasticsearch client: %s", err)
	}

	r := mux.NewRouter()
	// The API is versioned for future compatibility
	apiV1 := r.PathPrefix("/api/v1").Subrouter()
	apiV1.HandleFunc("/services/{key}", getServiceDetailHandler).Methods("GET")

	// ... other handlers for updating metadata, etc.

	log.Println("Starting server on :8080")
	if err := http.ListenAndServe(":8080", r); err != nil {
		log.Fatalf("Server failed to start: %v", err)
	}
}

这个代码片段展示了如何将来自两个完全不同数据源的数据聚合到一个 API 响应中。一个常见的错误是在处理这类聚合时,任何一个数据源的失败都会导致整个请求失败。在生产代码中,我们采取了更具韧性的策略:即使 Elasticsearch 查询失败,我们仍然返回从 ArangoDB 成功获取的元数据和依赖信息,只是 recent_logs 字段会为空。

Tyk 网关配置:保护与暴露 API

直接暴露内部服务是危险的。我们使用 Tyk 作为 API 网关,它以声明式 JSON 配置的方式管理 API。

以下是一个简化的 Tyk API 定义,用于代理我们的 Go 服务:

{
  "name": "idp-core-api",
  "api_id": "idp-core-api-1",
  "org_id": "default",
  "use_keyless": false,
  "auth": {
    "auth_header_name": "Authorization"
  },
  "version_data": {
    "not_versioned": true,
    "versions": {
      "Default": {
        "name": "Default",
        "expires": "",
        "paths": {
          "ignored": [],
          "white_list": [],
          "black_list": []
        },
        "use_extended_paths": true,
        "extended_paths": {}
      }
    }
  },
  "proxy": {
    "listen_path": "/idp-api/",
    "target_url": "http://idp-core-service.internal:8080/",
    "strip_listen_path": true
  },
  "enable_rate_limiting": true,
  "rate_limiting": {
    "rate": 1000,
    "per": 60,
    "quota_max": -1,
    "quota_renews": -1
  }
}

这个配置做了几件关键的事情:

  1. 路径代理: 将所有对网关 /idp-api/ 的请求,去除前缀后,转发到内部服务 http://idp-core-service.internal:8080/
  2. 强制鉴权: use_keyless: falseauth_header_name: "Authorization" 意味着所有请求必须携带一个有效的 API 密钥在 Authorization 头中。Tyk 负责密钥的验证。
  3. 速率限制: rate_limiting 配置将每个 API 密钥的请求速率限制在每分钟 1000 次,防止滥用。

在真实项目中,Tyk 配置会更复杂,可能包括用于转换请求/响应的中间件,或用于聚合多个后端服务的虚拟端点。

当前方案的局限性与未来展望

这套架构并非没有缺点。最大的挑战在于运维复杂性。我们现在需要同时维护和监控 ArangoDB 和 Elasticsearch 两个分布式集群,这对 SRE 团队提出了更高的要求。数据一致性也是一个潜在问题:当服务元数据在 ArangoDB 中更新时(例如,服务被重命名),必须确保有相应的机制来更新 Elasticsearch 中日志的关联字段,否则查询会产生偏差。目前我们通过事件驱动的方式异步处理这类更新,但这引入了最终一致性。

未来的迭代方向很明确。首先,我们将利用 Tyk 的 GraphQL 代理能力,将多个后端的 REST API(例如,ArgoCD API、Jenkins API)聚合为一个统一的 GraphQL 端点,让前端可以按需查询数据,减少请求次数。其次,我们正在探索将 ArangoDB 的图计算能力用于更深入的分析,比如自动检测架构中的“循环依赖”,或者在变更前进行影响面分析,模拟“如果这个服务宕机,哪些上游业务会受影响”。最后,我们计划引入 eBPF 等技术实现服务依赖关系的自动发现,而不是依赖 CI/CD 的手动上报,让我们的开发者平台“知识图谱”变得更加实时和准确。


  目录