Docker Swarm 与 Envoy Proxy 构建高基数时序数据移动端采集架构的权衡与实现


我们的技术挑战明确且棘手:需要为数百万活跃的iOS设备构建一个高吞吐、低延迟的时序数据采集后端。这些数据具备典型的高基数(High Cardinality)特征——每个设备ID都是一个独立的标签维度。同时,团队规模决定了我们必须寻求运维复杂度最低的健壮方案,避免陷入基础设施的泥潭。

定义问题与方案权衡

一个典型的现代技术栈方案会是 Kubernetes + Istio + gRPC + 一个原生TSDB客户端。这无疑是功能最强大、生态最丰富的组合。但在我们的场景下,这种方案的运维成本和认知负荷过高。Kubernetes的复杂性对于一个中小型团队来说,很可能意味着在业务交付之外,需要投入不成比例的精力去维护集群本身。

因此,我们评估了另一条路径:

  • 方案A: Kubernetes 主流方案

    • 编排: Kubernetes
    • 入口/网络: Istio/Nginx Ingress + gRPC
    • 数据访问: 官方提供的原生TSDB客户端
    • 优势: 功能强大,社区标准,可扩展性极佳。
    • 劣势: 运维复杂度极高,资源占用大,学习曲线陡峭。对于我们的目标——“稳定可靠地接收数据”,属于“杀鸡用牛刀”。
  • 方案B: Docker Swarm 简化方案

    • 编排: Docker Swarm
    • 入口/网络: Envoy Proxy (作为独立网关) + HTTP/2
    • 数据访问: ORM + 针对时序场景的适配层
    • 优势: Docker Swarm运维极简,心智负担低。Envoy作为独立网关,配置清晰,性能卓越,能满足mTLS、遥测等核心需求。ORM能统一应用层的数据模型,提高开发效率。
    • 劣势: Swarm生态不及K8s,高级调度与网络策略受限。ORM直接操作时序数据库通常是反模式,存在性能风险,需要精巧的设计来规避。

最终决策是方案B。我们愿意在“数据访问层”投入更多的设计与开发精力,以换取整个基础设施层面的“运维简单性”。这个权衡对于资源有限的团队至关重要。整个架构的核心在于,用Envoy处理所有入口流量的安全与路由,用Swarm管理无状态的应用服务,并设计一个巧妙的数据层,利用ORM处理元数据,同时用高效的方式写入时序数据。

架构概览与核心实现

我们的目标架构如下,所有后端服务都将作为Docker服务运行在Swarm集群中。

graph TD
    subgraph iOS Devices
        direction LR
        D1[iOS Device 1]
        D2[iOS Device 2]
        Dn[iOS Device n]
    end

    subgraph Internet
        I[mTLS over HTTP/2]
    end

    subgraph Docker Swarm Cluster
        direction TB
        E[Envoy Proxy Service]

        subgraph Ingestion Services
            direction LR
            S1[Ingestion API Svc 1]
            S2[Ingestion API Svc 2]
            Sn[Ingestion API Svc n]
        end

        subgraph Database
            DB[(TimescaleDB on PostgreSQL)]
        end

        E -->|Round Robin| S1
        E -->|Round Robin| S2
        E -->|Round Robin| Sn

        S1 --> DB
        S2 --> DB
        Sn --> DB
    end

    D1 --> I
    D2 --> I
    Dn --> I
    I --> E
  1. iOS客户端:通过HTTP/2与服务端建立mTLS双向认证连接,定期批量上报时序数据。
  2. Envoy Proxy:作为集群的唯一入口(Edge Proxy),终结TLS,验证客户端证书,然后将请求以轮询方式转发到后端的采集API服务。
  3. Ingestion API Service:一个无状态的Python应用,负责解析请求,并将元数据与时序数据分离,写入数据库。
  4. TimescaleDB:选择PostgreSQL的TimescaleDB扩展。它允许我们在同一个数据库中同时拥有标准的关系表(用于设备元数据)和高效的时序超表(Hypertable),这是我们能结合使用ORM的关键。

1. Envoy Proxy: 安全与路由的基石

Envoy的配置是架构的门面。我们不使用复杂的服务网格,仅将其作为一个高性能的边缘代理。以下是关键的envoy.yaml配置片段,用于处理mTLS和路由。

# envoy.yaml
static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 10000
    filter_chains:
    - filter_chain_match:
        server_names: ["ingest.your-domain.com"]
      filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          http2_protocol_options: {} # 启用HTTP/2
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match:
                  prefix: "/v1/ingest"
                route:
                  cluster: ingestion_service
                  # 关键:设置超时,防止慢客户端拖垮整个系统
                  timeout: 5s
          http_filters:
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
      transport_socket:
        name: envoy.transport_sockets.tls
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext
          # 服务端证书配置
          common_tls_context:
            tls_certificates:
            - certificate_chain: { filename: "/etc/envoy/certs/server.crt" }
              private_key: { filename: "/etc/envoy/certs/server.key" }
            # 关键:mTLS配置,要求并验证客户端证书
            validation_context:
              trusted_ca:
                filename: /etc/envoy/certs/ca.crt
              # 可选,但生产环境推荐,验证证书中的特定主题
              match_subject_alt_names:
              - exact: "ios-client"
          # 要求客户端必须提供证书
          require_client_certificate: true

  clusters:
  - name: ingestion_service
    connect_timeout: 0.5s
    type: STRICT_DNS # Swarm服务发现模式
    lb_policy: ROUND_ROBIN
    # Swarm内部服务名
    load_assignment:
      cluster_name: ingestion_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                # 使用Docker Swarm的内部DNS来解析服务
                address: ingestion_api
                port_value: 8000

这份配置的核心在于transport_socket部分,它强制要求客户端提供由我们签发的CA认证过的证书,实现了严格的设备准入控制。

2. Docker Swarm: 极简的服务编排

Swarm的魅力在于它的简单。一个docker-stack.yml文件就能定义整个后端。

# docker-stack.yml
version: '3.8'

services:
  envoy:
    image: envoyproxy/envoy:v1.24.0
    ports:
      - "10000:10000"
    volumes:
      - ./envoy.yaml:/etc/envoy/envoy.yaml:ro
      - ./certs:/etc/envoy/certs:ro
    networks:
      - backend_net
    deploy:
      replicas: 2
      placement:
        constraints: [node.role == manager] # 放置在manager节点以获得稳定的入口IP

  ingestion_api:
    image: my-ingestion-api:latest
    networks:
      - backend_net
    depends_on:
      - db
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/metricsdb
    deploy:
      replicas: 5 # 核心业务服务,可以轻松扩容
      update_config:
        parallelism: 1
        delay: 10s
      restart_policy:
        condition: on-failure

  db:
    image: timescale/timescaledb:latest-pg14
    volumes:
      - db_data:/var/lib/postgresql/data
    networks:
      - backend_net
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=metricsdb
    deploy:
      replicas: 1
      placement:
        constraints: [node.labels.storage == true] # 部署到带有持久化存储标签的节点

volumes:
  db_data:

networks:
  backend_net:
    driver: overlay

通过docker stack deploy -c docker-stack.yml myapp一条命令,整个后端服务就启动了。Swarm内置的overlay网络和DNS服务发现机制让服务间通信变得透明。我们可以通过docker service scale myapp_ingestion_api=10轻松地扩缩容采集服务。

3. ORM适配与高性能时序写入

这是整个方案的技术核心。我们使用Python、FastAPI和SQLAlchemy。挑战在于,SQLAlchemy这类ORM是为行存储的关系型数据库设计的,用于写入海量时序数据会因对象创建和状态管理的开销导致性能低下。

我们的策略是“扬长避短”:

  • 用ORM管理元数据:设备信息、版本等低频变更的数据,使用ORM能极大简化CRUD操作和模型管理。
  • 绕过ORM进行时序数据批量写入:对于高频的时序数据,我们直接使用数据库驱动的底层API进行高效的批量操作。

第一步: 定义数据模型

# models.py
import datetime
from sqlalchemy import create_engine, Column, String, DateTime, Integer, Float
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import SQLAlchemyError
import logging

Base = declarative_base()

# 使用ORM管理的设备元数据表
class Device(Base):
    __tablename__ = 'devices'
    
    device_id = Column(String, primary_key=True)
    os_version = Column(String, nullable=False)
    model_name = Column(String, nullable=False)
    first_seen = Column(DateTime, default=datetime.datetime.utcnow)
    last_seen = Column(DateTime, onupdate=datetime.datetime.utcnow)

# 这个模型仅用于结构定义,我们不会用ORM实例来插入数据
class Metric(Base):
    __tablename__ = 'metrics'
    
    time = Column(DateTime(timezone=True), primary_key=True)
    device_id = Column(String, primary_key=True)
    cpu_usage = Column(Float)
    memory_usage = Column(Integer)
    # ... 其他指标

# --- 数据库初始化逻辑 ---
# 在应用启动时,确保超表存在
def initialize_database(engine):
    with engine.connect() as connection:
        # 检查超表是否存在
        result = connection.execute("SELECT 1 FROM timescaledb_information.hypertables WHERE hypertable_name = 'metrics';")
        if result.scalar() is None:
            logging.info("Hypertable 'metrics' not found. Creating...")
            # 使用原生SQL将metrics表转换为TimescaleDB超表
            # 这是一个幂等操作,只在应用首次启动时执行
            connection.execute("SELECT create_hypertable('metrics', 'time');")
            logging.info("Hypertable 'metrics' created successfully.")

第二步: 编写采集服务的核心逻辑

main.py中的采集端点会同时执行两种操作。

# main.py
import csv
import io
from typing import List, Dict
from pydantic import BaseModel
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from . import models

# ... (SQLAlchemy engine, session setup) ...

app = FastAPI()

# Pydantic模型用于请求体验证
class MetricPoint(BaseModel):
    timestamp: datetime.datetime
    cpu_usage: float
    memory_usage: int

class IngestPayload(BaseModel):
    device_id: str
    os_version: str
    model_name: str
    metrics: List[MetricPoint]

# 关键:ORM与原生写入的混合数据访问层
class IngestionService:
    def __init__(self, db_session: Session):
        self.db_session = db_session

    def register_device(self, payload: IngestPayload):
        """使用ORM的UPSERT功能管理设备元数据, 这是ORM的强项"""
        stmt = pg_insert(models.Device).values(
            device_id=payload.device_id,
            os_version=payload.os_version,
            model_name=payload.model_name,
            last_seen=datetime.datetime.utcnow()
        ).on_conflict_do_update(
            index_elements=['device_id'],
            set_=dict(
                os_version=payload.os_version,
                model_name=payload.model_name,
                last_seen=datetime.datetime.utcnow()
            )
        )
        self.db_session.execute(stmt)

    def bulk_insert_metrics(self, device_id: str, metrics: List[MetricPoint]):
        """
        绕过ORM,使用psycopg2的copy_expert高效写入时序数据。
        这是性能的关键所在。
        """
        if not metrics:
            return

        # 将数据转换为CSV格式的字符串,这是COPY命令的最高效格式
        string_io = io.StringIO()
        writer = csv.writer(string_io)
        for point in metrics:
            writer.writerow([
                point.timestamp.isoformat(),
                device_id,
                point.cpu_usage,
                point.memory_usage
            ])
        string_io.seek(0)
        
        # 获取底层的DBAPI连接
        # 这里需要确保你的Session创建方式能访问到原始连接
        raw_conn = self.db_session.connection().connection
        try:
            with raw_conn.cursor() as cursor:
                # 使用COPY FROM STDIN,这是PostgreSQL最快的批量插入方法
                cursor.copy_expert(
                    sql="COPY metrics (time, device_id, cpu_usage, memory_usage) FROM STDIN WITH CSV",
                    file=string_io
                )
        except Exception as e:
            # 必须进行详尽的错误处理
            logging.error(f"Failed to bulk insert metrics for device {device_id}: {e}")
            raw_conn.rollback() # 如果COPY失败,回滚原始连接的事务
            raise
        
@app.post("/v1/ingest")
def ingest_data(payload: IngestPayload, session: Session = Depends(get_db_session)):
    service = IngestionService(session)
    try:
        # 在同一个事务中执行元数据更新和时序数据插入
        service.register_device(payload)
        service.bulk_insert_metrics(payload.device_id, payload.metrics)
        session.commit()
    except Exception as e:
        session.rollback()
        logging.error(f"Ingestion failed for device {payload.device_id}: {e}")
        raise HTTPException(status_code=500, detail="Internal server error during data ingestion.")
    
    return {"status": "ok"}

这段代码的精髓在于IngestionServiceregister_device方法利用SQLAlchemy的PostgreSQL方言来执行INSERT ... ON CONFLICT DO UPDATE,这是一个原子且高效的元数据更新操作。而bulk_insert_metrics则完全绕开了ORM的对象层,直接获取底层psycopg2连接,并使用其copy_expert方法,通过内存中的CSV流将数据灌入数据库。这是TimescaleDB官方推荐的最高效的数据写入方式。

4. iOS客户端实现考量

在iOS端,我们需要一个可靠的方式来发送数据并处理mTLS。

// Swift Code Snippet for iOS Client
import Foundation

class IngestionClient: NSObject, URLSessionDelegate {
    
    private let endpointURL = URL(string: "https://ingest.your-domain.com:10000/v1/ingest")!
    private var session: URLSession!

    override init() {
        super.init()
        // 使用委托模式的URLSession来处理自定义认证挑战
        let configuration = URLSessionConfiguration.default
        self.session = URLSession(configuration: configuration, delegate: self, delegateQueue: nil)
    }

    func send(payload: Data) {
        var request = URLRequest(url: endpointURL)
        request.httpMethod = "POST"
        request.setValue("application/json", forHTTPHeaderField: "Content-Type")
        request.httpBody = payload

        let task = session.dataTask(with: request) { data, response, error in
            // 处理响应和错误...
            if let httpResponse = response as? HTTPURLResponse, httpResponse.statusCode == 200 {
                print("Successfully ingested data.")
            } else {
                print("Ingestion failed: \(error?.localizedDescription ?? "Unknown error")")
            }
        }
        task.resume()
    }
    
    // 关键: 实现URLSessionDelegate以响应TLS认证挑战
    func urlSession(_ session: URLSession, didReceive challenge: URLAuthenticationChallenge, completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void) {
        
        // 确保是服务器信任的认证类型
        guard challenge.protectionSpace.authenticationMethod == NSURLAuthenticationMethodServerTrust,
              let serverTrust = challenge.protectionSpace.serverTrust else {
            completionHandler(.cancelAuthenticationChallenge, nil)
            return
        }

        // --- mTLS 客户端证书提供 ---
        // 从应用的Bundle或Keychain中加载客户端证书和私钥
        // 这里的`ClientIdentity.get()`是一个假设的辅助函数
        guard let clientIdentity = ClientIdentity.get() else {
            print("Client identity (certificate and private key) not found.")
            completionHandler(.cancelAuthenticationChallenge, nil)
            return
        }

        // 使用我们的客户端身份创建凭证
        let credential = URLCredential(identity: clientIdentity, certificates: nil, persistence: .forSession)
        completionHandler(.useCredential, credential)
    }
}

此处的关键是实现URLSessionDelegateurlSession(_:didReceive:completionHandler:)方法。当Envoy请求客户端证书时,这个委托方法会被触发。我们在这里从本地加载P12文件或Keychain中的身份,并创建URLCredential对象来完成mTLS握手。

架构的局限性与未来演进

这个架构并非银弹。它的主要局限性在于:

  1. Swarm生态的局限:相比Kubernetes,Swarm在自动扩缩容(没有HPA)、高级网络策略(没有Cilium/Calico)、存储卷管理(没有CSI)等方面功能较弱。当业务复杂度进一步提升,例如需要更精细的流量控制或有状态服务时,可能会遇到瓶颈。
  2. 数据访问层的维护成本:虽然我们巧妙地结合了ORM和原生API,但这层自定义代码需要持续维护。如果数据库模式发生变化,或者需要支持新的数据类型,都需要手动修改这部分逻辑。
  3. 单点数据库:目前的架构中,TimescaleDB是一个单点。虽然可以设置主从复制,但实现真正的多活写入或自动故障转移,需要引入更复杂的数据库集群方案,这会增加运维负担。

当团队规模扩大,运维能力增强时,一个合理的演进路径可能是将编排层从Swarm迁移到Kubernetes。由于采集服务本身是无状态的,这个迁移过程相对直接。Envoy的配置也可以平滑地迁移到K8s Ingress或作为Sidecar。届时,我们将能享受到K8s更强大的生态系统,而核心的数据处理逻辑依然可以复用。


  目录