基于 Ktor 和 DDD 实现跨越 SQS 与 Envoy 的全链路追踪


在分布式系统中,可观测性的一个常见盲点是异步消息边界。一个由 HTTP 请求触发的业务流程,其追踪上下文(Trace Context)在消息被发布到 AWS SQS 或 Kafka 时往往会中断。消费该消息的服务所产生的追踪日志(Trace)因此变成了一个“孤儿”,无法与它的触发源关联起来,这在排查生产环境问题时是致命的。我们的挑战在于,如何在一个由 Ktor、Envoy Proxy 和 SQS 构成的技术栈中,将这些断裂的追踪链条严丝合缝地连接起来。

定义问题:异步边界的追踪上下文丢失

设想一个订单处理系统。其架构包含一个基于 Ktor 并采用领域驱动设计(DDD)原则构建的订单服务。所有入口流量都经过 Envoy Proxy 进行路由和策略控制。当一个创建订单的 API 请求抵达时,服务在持久化核心订单聚合(Aggregate)后,会发布一个 OrderCreated 领域事件(Domain Event)到 SQS 队列。另一个后台处理进程(或另一个微服务)会消费此事件,执行后续的履约、通知等操作。

这个流程中,全链路追踪的目标是:在 SkyWalking UI 上,能够看到一条完整的调用链,它始于 Envoy 接收到的外部 HTTP 请求,贯穿 Ktor 订单服务的同步处理逻辑,跨越 SQS 消息队列,最终延伸到后台消费者的异步处理逻辑。

sequenceDiagram
    participant Client
    participant Envoy
    participant Ktor Order Service (API)
    participant SQS
    participant Ktor Order Service (Consumer)

    Client->>+Envoy: POST /orders
    Note over Envoy: 启动 Trace, 生成 Trace ID
    Envoy->>+Ktor Order Service (API): HTTP Request with trace headers
    Ktor Order Service (API)->>Ktor Order Service (API): DDD 业务逻辑处理 (创建订单)
    Ktor Order Service (API)->>+SQS: Publish `OrderCreated` Event
    Note right of Ktor Order Service (API): 追踪上下文在此处中断
    SQS-->>-Ktor Order Service (API): Ack
    Ktor Order Service (API)-->>-Envoy: HTTP 202 Accepted
    Envoy-->>-Client: HTTP 202 Accepted

    %% 异步流程
    Ktor Order Service (Consumer)->>+SQS: Poll message
    SQS-->>-Ktor Order Service (Consumer): `OrderCreated` Event message
    Note over Ktor Order Service (Consumer): 启动新的 Trace, 与上游无关
    Ktor Order Service (Consumer)->>Ktor Order Service (Consumer): 处理履约逻辑

上图清晰地展示了问题所在:Ktor Order Service (API) 发布消息后,追踪上下文就丢失了。消费者启动了一个全新的、无父级上下文的 Trace。

方案 A:完全依赖自动化探针(The Naive Approach)

一种直接的想法是依赖 OpenTelemetry Java Agent 或 SkyWalking Agent 这样的自动化探针。理论上,它们能自动拦截主流框架的调用,实现无侵入的追踪。

  • 优势分析:

    • 零代码侵入: 无需修改任何业务代码,部署和运维相对简单。
    • 广泛支持: 对主流的 HTTP Client、gRPC、JDBC 等都有良好的支持。
  • 劣势与实践陷阱:

    • 无法跨越消息中间件: 这是此方案的致命缺陷。自动化探针可以追踪到 Ktor 应用调用 AWS SQS SDK 的 sendMessage 方法,并将其记录为一个 Span。但是,探针本身并不知道如何、也无权将当前的 Trace Context(如 traceparent W3C 标准头)注入到 SQS 消息的内容属性中。消息一旦进入 SQS 队列,就与它的“前世”彻底失去了联系。
    • 配置黑盒: 探针的行为高度依赖其自身配置和版本,对于非标准或较新的框架(有时包括 Ktor 的某些特定用法),可能会出现兼容性问题。在真实项目中,完全依赖一个黑盒来保障核心的可观测性,风险太高。

结论是,方案 A 在处理跨进程的同步调用(如 HTTP)时非常有效,但在我们面对的异步消息边界问题上,它无能为力。

方案 B:手动传播追踪上下文(The Pragmatic Approach)

该方案的核心思想是:承认自动化探针的局限性,并在跨越异步边界的关键点进行手动干预。即在消息发送前,从当前执行上下文中提取出 Trace Context,并将其作为元数据附加到消息上;在消息消费后,先解析出这些元数据,恢复 Trace Context,再执行业务逻辑。

  • 优势分析:

    • 问题根治: 精准地解决了上下文丢失的核心问题,可以构建完整的、跨越任何异步中间件的调用链。
    • 协议无关: 这种模式不依赖于特定的消息中间件。无论是 SQS、RabbitMQ 还是 Kafka,只要其消息结构支持附加自定义元数据(SQS 的 Message Attributes, Kafka 的 Record Headers),此方案就适用。
    • 明确可控: 开发者对追踪上下文的生命周期有完全的控制,行为明确,易于调试。
  • 劣势与成本:

    • 代码侵入: 需要在消息的生产者和消费者两端编写少量与 OpenTelemetry API 耦合的代码。这违反了业务逻辑与基础设施逻辑严格分离的原则。
    • 实现复杂度: 相较于方案 A,需要开发者对 OpenTelemetry 的传播器(Propagator)和上下文(Context)API 有所了解。

最终决策: 在生产环境中,一个无法追踪的异步流程就是一个定时炸弹。其潜在的故障排查成本远高于引入少量基础设施代码的成本。因此,我们选择方案 B。我们将通过良好的代码抽象,将这种侵入性降至最低。

核心实现概览

我们将构建一个简化的 Ktor 应用来演示完整的实现。

1. 项目依赖与结构

首先,在 build.gradle.kts 中配置必要的依赖。

// build.gradle.kts
plugins {
    kotlin("jvm") version "1.9.21"
    id("io.ktor.plugin") version "2.3.6"
    id("org.jetbrains.kotlin.plugin.serialization") version "1.9.21"
}

// ... repositories and group/version definitions

dependencies {
    // Ktor
    implementation("io.ktor:ktor-server-core-jvm")
    implementation("io.ktor:ktor-server-netty-jvm")
    implementation("io.ktor:ktor-server-content-negotiation-jvm")
    implementation("io.ktor:ktor-serialization-kotlinx-json-jvm")

    // AWS SQS SDK V2
    implementation(platform("software.amazon.awssdk:bom:2.21.32"))
    implementation("software.amazon.awssdk:sqs")

    // OpenTelemetry & SkyWalking Exporter
    implementation("io.opentelemetry:opentelemetry-api:1.32.0")
    implementation("io.opentelemetry:opentelemetry-sdk:1.32.0")
    implementation("io.opentelemetry:opentelemetry-sdk-trace:1.32.0")
    implementation("io.opentelemetry:opentelemetry-exporter-otlp:1.32.0")
    // Use SkyWalking's OTLP receiver
    implementation("org.apache.skywalking:apm-opentelemetry-1-x-exporter:8.15.0") {
        exclude(group = "io.opentelemetry")
    }

    // Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

    // Logging
    implementation("ch.qos.logback:logback-classic:1.4.11")
}

项目结构将遵循 DDD 的分层思想:

src/main/kotlin/com/example
├── application/    # 应用服务, 协调领域对象
│   └── OrderService.kt
├── domain/         # 领域核心
│   ├── model/
│   │   └── Order.kt
│   └── port/
│       └── OrderEventPublisher.kt
├── infrastructure/ # 基础设施实现
│   ├── observability/
│   │   └── OpenTelemetryConfig.kt
│   └── queue/
│       └── SqsOrderEventPublisher.kt
├── ui/             # Web 接口层
│   └── OrderRoutes.kt
└── Application.kt    # Ktor 应用主入口

2. OpenTelemetry 与 SkyWalking 配置

我们需要一个单例来配置和初始化 OpenTelemetry SDK,使其将数据导出到 SkyWalking OAP。

// infrastructure/observability/OpenTelemetryConfig.kt
package com.example.infrastructure.observability

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes
import java.util.concurrent.TimeUnit

object OpenTelemetryConfig {

    fun initialize(serviceName: String): OpenTelemetry {
        // SkyWalking OAP OTLP gRPC receiver is typically at 11800
        val otlpExporter = OtlpGrpcSpanExporter.builder()
            .setEndpoint("http://localhost:11800")
            .setTimeout(2, TimeUnit.SECONDS)
            .build()

        val serviceNameResource = Resource.create(
            io.opentelemetry.api.common.Attributes.of(
                ResourceAttributes.SERVICE_NAME, serviceName
            )
        )

        val tracerProvider = SdkTracerProvider.builder()
            .addSpanProcessor(BatchSpanProcessor.builder(otlpExporter).build())
            .setResource(Resource.getDefault().merge(serviceNameResource))
            .build()

        val sdk = OpenTelemetrySdk.builder()
            .setTracerProvider(tracerProvider)
            .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
            .build()
        
        // Register as global OTel instance. Crucial for agent integration and library usage.
        GlobalOpenTelemetry.set(sdk)

        // Add a shutdown hook to flush spans
        Runtime.getRuntime().addShutdownHook(Thread {
            println("Shutting down OpenTelemetry SDK...")
            tracerProvider.shutdown().join(10, TimeUnit.SECONDS)
            println("OpenTelemetry SDK shut down.")
        })
        
        return sdk
    }
}

Application.kt 中调用它:

// Application.kt
fun main() {
    val openTelemetry = OpenTelemetryConfig.initialize("ktor-ddd-sqs-service")
    // ... Ktor server startup code
}

3. Envoy Proxy 配置

Envoy 的配置需要启用追踪,并指向 SkyWalking。这里的关键是 tracing 配置块。Envoy 会负责处理入口请求的追踪头,并生成初始 Span。

# envoy.yaml
static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 0.0.0.0, port_value: 10000 }
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match: { prefix: "/" }
                route: { cluster: ktor_service }
          http_filters:
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
          
          # Tracing configuration for SkyWalking
          tracing:
            provider:
              name: envoy.tracers.skywalking
              typed_config:
                "@type": type.googleapis.com/envoy.config.trace.v3.SkyWalkingConfig
                grpc_service:
                  envoy_grpc:
                    cluster_name: skywalking_oap
                  # This timeout is for Envoy sending trace data to OAP
                  timeout: 0.5s
                client_config:
                  service_name: envoy-proxy
                  instance_name: local-instance

  clusters:
  - name: ktor_service
    connect_timeout: 0.25s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: ktor_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                # Assuming Ktor app runs on port 8080 on the host machine
                address: host.docker.internal
                port_value: 8080

  - name: skywalking_oap
    connect_timeout: 1s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    # This must be HTTP/2 for gRPC
    typed_extension_protocol_options:
      envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
        "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
        explicit_http_config:
          http2_protocol_options: {}
    load_assignment:
      cluster_name: skywalking_oap
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                # Assuming SkyWalking OAP runs on port 11800 on the host machine
                address: host.docker.internal
                port_value: 11800

4. 生产者:注入追踪上下文到 SQS 消息

这是第一个关键点。我们将创建一个 SqsOrderEventPublisher,它实现了领域层的 OrderEventPublisher 接口。

// infrastructure/queue/SqsOrderEventPublisher.kt
package com.example.infrastructure.queue

import com.example.domain.model.OrderCreatedEvent
import com.example.domain.port.OrderEventPublisher
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapSetter
import kotlinx.coroutines.future.await
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
import software.amazon.awssdk.services.sqs.model.SendMessageRequest

class SqsOrderEventPublisher(
    private val sqsClient: SqsAsyncClient,
    private val queueUrl: String
) : OrderEventPublisher {

    // Get the W3C TextMapPropagator, which we configured as global.
    private val propagator = GlobalOpenTelemetry.getPropagators().textMapPropagator
    
    // Create a setter for SQS Message Attributes.
    // This tells the propagator HOW to write context into our carrier.
    private val sqsSetter = TextMapSetter<MutableMap<String, MessageAttributeValue>> { carrier, key, value ->
        carrier?.put(key, MessageAttributeValue.builder().dataType("String").stringValue(value).build())
    }

    override suspend fun publish(event: OrderCreatedEvent) {
        val tracer = GlobalOpenTelemetry.getTracer("sqs-publisher")
        val span = tracer.spanBuilder("SQS::Publish OrderCreatedEvent").startSpan()

        try {
            span.makeCurrent().use { // Make the new span the current one for this scope
                val messageAttributes = mutableMapOf<String, MessageAttributeValue>()

                // ** THE CORE LOGIC FOR INJECTION **
                // Inject the current context (which includes traceId, spanId) into the message attributes.
                propagator.inject(Context.current(), messageAttributes, sqsSetter)

                val messageBody = """{"orderId": "${event.orderId}", "timestamp": "${event.timestamp}"}"""

                val sendMessageRequest = SendMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .messageBody(messageBody)
                    .messageAttributes(messageAttributes)
                    .build()

                sqsClient.sendMessage(sendMessageRequest).await()
                span.setAttribute("messaging.system", "aws.sqs")
                span.setAttribute("messaging.destination", queueUrl)
                span.setAttribute("event.orderId", event.orderId)
            }
        } catch (e: Exception) {
            span.recordException(e)
            throw e
        } finally {
            span.end()
        }
    }
}

代码解析:

  1. 我们从 GlobalOpenTelemetry 获取全局配置的 TextMapPropagator
  2. TextMapSetter 是一个关键的适配器。它定义了如何将一个键值对(如 traceparent: 00-xxx-yyy-01)写入我们的载体,在这里载体是 SQS 的 MessageAttributeValue 映射。
  3. propagator.inject(Context.current(), ...) 是魔法发生的地方。它从当前执行上下文(Context.current())中提取追踪信息,并使用我们提供的 sqsSetter 将其写入 messageAttributes
  4. 最后,我们带着这些包含追踪上下文的属性发送 SQS 消息。

5. 消费者:从 SQS 消息中提取上下文并恢复追踪链

现在是另一半工作。我们需要一个后台任务来轮询 SQS 队列,并在处理消息前提取上下文。

// infrastructure/queue/SqsEventConsumer.kt (A conceptual example)
package com.example.infrastructure.queue

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import kotlinx.coroutines.*
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest

class SqsEventConsumer(
    private val sqsClient: SqsAsyncClient,
    private val queueUrl: String
) {
    private val job = CoroutineScope(Dispatchers.IO).launch { startPolling() }

    private val propagator = GlobalOpenTelemetry.getPropagators().textMapPropagator

    // Getter adapter for SQS Message Attributes.
    // This tells the propagator HOW to read context from our carrier.
    private val sqsGetter = object : TextMapGetter<Message> {
        override fun keys(carrier: Message): Iterable<String> {
            return carrier.messageAttributes().keys
        }
        override fun get(carrier: Message?, key: String): String? {
            return carrier?.messageAttributes()?.get(key)?.stringValue()
        }
    }

    private suspend fun startPolling() {
        while (isActive) {
            try {
                val receiveRequest = ReceiveMessageRequest.builder()
                    .queueUrl(queueUrl)
                    .messageAttributeNames("All")
                    .waitTimeSeconds(20)
                    .maxNumberOfMessages(10)
                    .build()

                val messages = sqsClient.receiveMessage(receiveRequest).await().messages()
                for (message in messages) {
                    processMessageWithTrace(message)
                }
            } catch (e: Exception) {
                // In production, add proper logging and backoff strategy
                println("Error polling SQS: ${e.message}")
                delay(5000)
            }
        }
    }
    
    private fun processMessageWithTrace(message: Message) {
        // ** THE CORE LOGIC FOR EXTRACTION **
        // Extract the parent context from message attributes.
        val parentContext = propagator.extract(Context.current(), message, sqsGetter)

        // Create a new span that is a child of the extracted parent context.
        val tracer = GlobalOpenTelemetry.getTracer("sqs-consumer")
        val spanBuilder = tracer.spanBuilder("SQS::Process OrderCreatedEvent")

        // This links the new span to the original trace.
        spanBuilder.setParent(parentContext)

        val span = spanBuilder.startSpan()

        // Execute business logic within the scope of the new, linked span.
        span.makeCurrent().use {
            try {
                println("Processing message: ${message.body()}")
                span.setAttribute("messaging.system", "aws.sqs")
                span.setAttribute("messaging.message_id", message.messageId())
                
                // Simulate business logic
                Thread.sleep(100)
                
                // Delete message after successful processing
                // sqsClient.deleteMessage(...)
            } catch (e: Exception) {
                span.recordException(e)
            } finally {
                span.end()
            }
        }
    }

    fun stop() {
        job.cancel()
    }
}

代码解析:

  1. TextMapGetterTextMapSetter 的反向操作,它告诉传播器如何从 SQS Message 对象中读取元数据。
  2. propagator.extract(Context.current(), ...) 读取消息属性,并构建一个包含父级 Span 上下文的 Context 对象。
  3. spanBuilder.setParent(parentContext) 是连接断裂追踪链条的关键一步。它明确告诉 OpenTelemetry,即将创建的这个 Span 不是一个新的根 Span,而是之前通过 SQS 传递过来的那个 Trace 的一部分。
  4. 所有后续的业务逻辑都在这个新创建的、正确链接的 Span 范围内执行,确保了整个异步流程都被归于同一个 Trace ID 之下。

架构的局限性与未来展望

当前方案虽然有效,但并非完美。最主要的局限性在于基础设施逻辑(追踪上下文的传播)与应用层代码(消息发布与消费逻辑)存在耦合。虽然我们通过 DDD 的端口与适配器模式(OrderEventPublisher 接口和 SqsOrderEventPublisher 实现)将这种耦合限制在了基础设施层,但这种耦合本身依然存在。

未来的演进方向可能包括:

  1. 更智能的客户端库: 理想情况下,AWS SQS SDK 或更高层次的消息框架(如 Spring Cloud Messaging)应原生支持 OpenTelemetry 上下文传播。开发者只需开启一个配置,所有的注入和提取操作都将由库自动完成。这需要生态系统的共同推动。
  2. 服务网格的演进: 服务网格(Service Mesh)目前主要处理同步的 L7 流量。未来,网格的数据平面(如 Envoy)可能会扩展其能力,以代理和理解消息队列协议。如果 Envoy 能够原生代理 SQS 流量,它就有可能像处理 HTTP Header 一样,自动完成追踪上下文的注入与提取,从而让应用代码彻底回归纯粹。
  3. eBPF 的可能性: 基于 eBPF 的零侵入可观测性技术提供了一个更底层的视角。理论上,可以通过 eBPF 探针监控到应用调用 SQS SDK 的系统调用,并动态地修改内存中的消息数据来注入上下文。这技术非常前沿,但实现复杂度和稳定性要求极高,目前还不适用于大多数生产场景。

  目录