打通同步与异步边界 构建基于Sentry、Jaeger、PubSub与SQL Server的统一可观测性管道


系统监控的复杂性往往不在于单个组件,而在于组件之间的“缝隙”。一个请求进入API,写入SQL Server,然后发布一条消息到Google Cloud Pub/Sub,由另一个后台服务消费。当消费者服务抛出一个异常时,我们在Sentry里看到一个孤立的错误报告。在Jaeger里,我们可能看到两条断开的链路:一条是API到Pub/Sub发布的,另一条是消费者服务自己启动的。它们之间没有任何关联。这种断裂的遥测数据在排查问题时是致命的,尤其是在复杂的分布式系统中。

我们面临的真实挑战就是缝合这个缺口。目标是,无论一个业务流程如何跨越同步调用(如数据库操作)和异步消息传递,我们都能在Jaeger中看到一条完整的、连贯的分布式链路。同时,任何环节的错误,都应该能在Sentry中被捕获,并且能精确地关联到这条完整的链路上。

问题的根源在于上下文的丢失。分布式追踪的核心是上下文传播(Context Propagation),通常是通过W3C Trace Context标准实现的traceparent头。在HTTP服务间调用时,这通常是自动的。但当请求流经一个消息队列时,这个上下文必须被手动或通过特定工具库序列化到消息中,然后在消费者端再反序列化出来,才能将链路延续下去。

我们将从一个典型的.NET项目入手,一步步构建这个统一的可观测性管道。这个过程将暴露许多现实世界中的陷阱,并给出生产级的解决方案。

第一阶段:生产者服务的基础链路构建

我们从一个ASP.NET Core Web API项目开始,它负责接收外部请求,与SQL Server交互,并向Pub/Sub发布消息。我们的第一步是确保API和数据库的交互能够被正确追踪。

在真实项目中,我们会使用OpenTelemetry作为标准,因为它提供了与供应商无关的API和SDK,能够同时对接到Jaeger(用于追踪)和Sentry(用于错误和性能监控)。

首先,配置生产者服务的依赖项 (Producer.API.csproj):

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
    <ImplicitUsings>enable</ImplicitUsings>
  </PropertyGroup>

  <ItemGroup>
    <!-- 核心 OpenTelemetry 包 -->
    <PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.7.0" />
    <PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.7.1" />
    <PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.7.1" />
    
    <!-- SQL Server 客户端追踪 -->
    <PackageReference Include="OpenTelemetry.Instrumentation.SqlClient" Version="1.6.0-beta.3" />

    <!-- Jaeger 导出器 -->
    <PackageReference Include="OpenTelemetry.Exporter.Jaeger" Version="1.6.0" />

    <!-- Sentry 与 OpenTelemetry 集成 -->
    <PackageReference Include="Sentry.OpenTelemetry" Version="4.2.0" />

    <!-- Google Cloud Pub/Sub 客户端 -->
    <PackageReference Include="Google.Cloud.PubSub.V1" Version="3.8.0" />

    <!-- 数据库驱动 -->
    <PackageReference Include="Microsoft.Data.SqlClient" Version="5.1.2" />

  </ItemGroup>

</Project>

接下来是核心的Program.cs配置。这里的关键是将所有需要的Instrumentation(检测工具)和Exporter(导出器)注册到OpenTelemetry的TracerProvider

// Program.cs in Producer.API
using System.Diagnostics;
using Google.Cloud.PubSub.V1;
using Microsoft.Data.SqlClient;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

var builder = WebApplication.CreateBuilder(args);

// 定义服务名称,这在分布式追踪中至关重要
const string serviceName = "Producer.API";
const string serviceVersion = "1.0.0";

// 1. 配置 OpenTelemetry
builder.Services.AddOpenTelemetry()
    .ConfigureResource(resource => resource
        .AddService(serviceName: serviceName, serviceVersion: serviceVersion))
    .WithTracing(tracing => tracing
        .AddSource(serviceName) // 添加自定义 ActivitySource
        .AddAspNetCoreInstrumentation() // 自动检测 ASP.NET Core 请求
        .AddHttpClientInstrumentation()   // 自动检测 HttpClient 调用
        .AddSqlClientInstrumentation(options => // 自动检测 SQL Server 调用
        {
            options.SetDbStatementForText = true; // 记录 SQL 语句
            options.RecordException = true;
        })
        .AddJaegerExporter(options => // 配置 Jaeger 导出器
        {
            options.AgentHost = builder.Configuration.GetValue<string>("Jaeger:Host");
            options.AgentPort = builder.Configuration.GetValue<int>("Jaeger:Port");
        })
        // 关键一步: 添加 Sentry 处理器,它会把追踪信息关联到 Sentry 事件
        .AddSentry()
    );

// 2. 注册自定义的 ActivitySource,用于手动创建 Span
builder.Services.AddSingleton(new ActivitySource(serviceName));

// 3. 注册 Google Cloud Pub/Sub PublisherClient
// 在真实项目中,ProjectId 和 TopicId 会来自配置
builder.Services.AddSingleton(await new PublisherClientBuilder
{
    TopicName = new TopicName("your-gcp-project-id", "my-topic")
}.BuildAsync());


var app = builder.Build();

// Web API 端点
app.MapPost("/orders", async (
    OrderRequest request, 
    PublisherClient publisher,
    ActivitySource activitySource,
    ILogger<Program> logger) =>
{
    // 模拟数据库操作
    using (var activity = activitySource.StartActivity("Database:SaveOrder", ActivityKind.Client))
    {
        activity?.SetTag("db.system", "sqlserver");
        activity?.SetTag("order.id", request.OrderId);
        
        // 这里的连接字符串应来自配置
        const string connectionString = "Server=tcp:your_server.database.windows.net,1433;Initial Catalog=your_db;...";
        await using var connection = new SqlConnection(connectionString);
        await connection.OpenAsync();
        
        var command = new SqlCommand("INSERT INTO Orders (Id, Product, Amount) VALUES (@Id, @Product, @Amount)", connection);
        command.Parameters.AddWithValue("@Id", request.OrderId);
        command.Parameters.AddWithValue("@Product", request.Product);
        command.Parameters.AddWithValue("@Amount", request.Amount);
        await command.ExecuteNonQueryAsync();
        
        logger.LogInformation("Order {OrderId} saved to database.", request.OrderId);
    }
    
    // 此时,链路可以从 API 入口追踪到数据库操作
    // 但接下来发布消息时,链路将中断

    // 后续会在这里添加上下文传播逻辑
    var message = new PubsubMessage
    {
        Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(request))
    };

    await publisher.PublishAsync(message);
    logger.LogInformation("Message for Order {OrderId} published.", request.OrderId);
    
    return Results.Accepted();
});

// 配置 Sentry SDK
builder.WebHost.UseSentry(options =>
{
    // DSN 应该从安全配置中读取
    options.Dsn = "your-sentry-dsn"; 
    options.TracesSampleRate = 1.0; // 在生产中调整采样率
    options.UseOpenTelemetry(); // 启用 Sentry 与 OpenTelemetry 的集成
});

app.Run();

public record OrderRequest(string OrderId, string Product, decimal Amount);

此时,如果我们启动Jaeger和这个API服务,然后发送一个POST请求到/orders,我们会在Jaeger UI中看到一条链路。这条链路包含了从POST /orders开始的根Span,以及一个子Span Database:SaveOrder,甚至还有一个SqlClient自动生成的更底层的DB Span。这证明了我们的第一阶段目标达成:同步调用链是完整的。

第二阶段:暴露并解决异步边界的断裂问题

现在我们创建消费者服务。它是一个简单的后台工作进程(BackgroundService),负责监听Pub/Sub主题并处理消息。

消费者服务的项目文件 (Consumer.Worker.csproj) 与生产者类似,但不需要AspNetCore检测,而是需要一个宿主环境。

// Program.cs in Consumer.Worker
using System.Diagnostics;
using Consumer.Worker;
using Google.Cloud.PubSub.V1;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

const string serviceName = "Consumer.Worker";
const string serviceVersion = "1.0.0";

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((hostContext, services) =>
    {
        services.AddOpenTelemetry()
            .ConfigureResource(resource => resource
                .AddService(serviceName: serviceName, serviceVersion: serviceVersion))
            .WithTracing(tracing => tracing
                .AddSource(serviceName)
                .AddJaegerExporter(options => // 配置 Jaeger
                {
                    options.AgentHost = hostContext.Configuration.GetValue<string>("Jaeger:Host");
                    options.AgentPort = hostContext.Configuration.GetValue<int>("Jaeger:Port");
                })
                .AddSentry());
        
        services.AddSingleton(new ActivitySource(serviceName));
        
        // 注册 Pub/Sub SubscriberClient
        services.AddSingleton(async (provider) => await new SubscriberClientBuilder
        {
            SubscriptionName = new SubscriptionName("your-gcp-project-id", "my-subscription"),
            ClientCreationSettings = new ClientCreationSettings(credentials: provider.GetService<Google.Api.Gax.Grpc.ChannelCredentials>())
        }.BuildAsync());
        
        services.AddHostedService<Worker>();
    })
    .UseSentry(options =>
    {
        options.Dsn = "your-sentry-dsn";
        options.TracesSampleRate = 1.0;
        options.UseOpenTelemetry();
    })
    .Build();

host.Run();

Worker.cs 是消费逻辑的核心:

// Worker.cs in Consumer.Worker
public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;
    private readonly SubscriberClient _subscriber;
    private readonly ActivitySource _activitySource;

    public Worker(ILogger<Worker> logger, SubscriberClient subscriber, ActivitySource activitySource)
    {
        _logger = logger;
        _subscriber = subscriber;
        _activitySource = activitySource;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _subscriber.StartAsync((message, token) =>
        {
            // 在这里,我们创建了一个新的根 Span
            // 因为没有从消息中提取任何上下文
            using (var activity = _activitySource.StartActivity("ProcessOrderMessage", ActivityKind.Consumer))
            {
                var orderJson = message.Data.ToStringUtf8();
                _logger.LogInformation("Received message: {messageId}, Body: {body}", message.MessageId, orderJson);
                
                activity?.SetTag("messaging.system", "gcp_pubsub");
                activity?.SetTag("messaging.message_id", message.MessageId);

                // 模拟处理逻辑
                ProcessOrder(orderJson);
            }
            
            // 确认消息
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
    }

    private void ProcessOrder(string orderJson)
    {
        _logger.LogInformation("Processing order: {order}", orderJson);
        // 模拟一些工作
        Thread.Sleep(100); 
    }
}

现在,如果我们运行整个系统(生产者API和消费者Worker),然后向API发送请求,我们会观察到预期的“断裂”现象:

  1. 在Jaeger中,出现一条源自Producer.API的链路,它在PublishAsync调用后就结束了。
  2. 同时,出现另一条完全独立的、源自Consumer.Worker的新链路,它的根Span是ProcessOrderMessage
  3. 这两条链路之间没有任何父子关系。这就是上下文丢失的直接后果。

第三阶段:手动注入与提取追踪上下文

要修复这个问题,我们必须手动扮演Instrumentation库的角色:在发布消息前,将当前的追踪上下文注入到消息的属性中;在消费消息时,从属性中提取它,并用它来启动新的Span。

W3C Trace Context 标准定义了两个关键的HTTP头:traceparenttracestate。我们将它们作为Pub/Sub消息的Attributes来传递。

为此,我们创建一个辅助类TraceContextPropagator来封装这个逻辑。

// TraceContextPropagator.cs (a shared utility class)
using System.Diagnostics;
using Google.Cloud.PubSub.V1;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;

public static class TraceContextPropagator
{
    private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

    // 将当前 Activity Context 注入到 PubsubMessage 的 Attributes 中
    public static void Inject(PubsubMessage message)
    {
        // 这是一个关键的步骤,它使用 OpenTelemetry 的默认传播器(通常是 W3C TraceContext)
        // 将当前的 Activity 上下文(traceId, spanId 等)序列化到一个字典中。
        Propagator.Inject(new PropagationContext(Activity.Current?.Context ?? default, Baggage.Current), message,
            (msg, key, value) =>
            {
                // Action<T, string, string> for setting attributes
                if (msg.Attributes.ContainsKey(key))
                {
                    msg.Attributes[key] = value;
                }
                else
                {
                    msg.Attributes.Add(key, value);
                }
            });
    }

    // 从 PubsubMessage 的 Attributes 中提取父上下文
    public static PropagationContext Extract(PubsubMessage message)
    {
        // 这是一个逆向过程,从消息的属性中读取 W3C TraceContext 信息,
        // 并将其反序列化为一个 PropagationContext 对象,这个对象可以用来建立父子Span关系。
        return Propagator.Extract(default, message, (msg, key) =>
        {
            msg.Attributes.TryGetValue(key, out var value);
            return new[] { value };
        });
    }
}

现在,我们来改造生产者和消费者的代码。

在生产者 Producer.API 中:

修改/orders端点的消息发布部分:

// ... inside app.MapPost("/orders", ...)
// ...
    var message = new PubsubMessage
    {
        Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(request))
    };

    // 关键步骤: 在发布前注入追踪上下文
    using (var activity = activitySource.StartActivity("PubSub:Publish", ActivityKind.Producer))
    {
        TraceContextPropagator.Inject(message);
        activity?.SetTag("messaging.system", "gcp_pubsub");
        activity?.SetTag("messaging.destination.name", "my-topic");
        
        await publisher.PublishAsync(message);
    }
    
    logger.LogInformation("Message for Order {OrderId} published with trace context.", request.OrderId);
// ...

在消费者 Consumer.Worker 中:

修改ExecuteAsync的消息处理回调:

// ... inside Worker.ExecuteAsync ...
await _subscriber.StartAsync((message, token) =>
{
    // 关键步骤: 提取父上下文
    var parentContext = TraceContextPropagator.Extract(message);

    // 将提取的上下文与当前的 Baggage 关联起来
    Baggage.Current = parentContext.Baggage;

    // 使用提取的父上下文来启动新的 Activity (Span)
    // 这样 OpenTelemetry 才能建立正确的父子关系
    using (var activity = _activitySource.StartActivity("ProcessOrderMessage", ActivityKind.Consumer, parentContext.ActivityContext))
    {
        var orderJson = message.Data.ToStringUtf8();
        _logger.LogInformation("Received message: {messageId}, Body: {body}", message.MessageId, orderJson);
        
        activity?.SetTag("messaging.system", "gcp_pubsub");
        activity?.SetTag("messaging.message_id", message.MessageId);

        ProcessOrder(orderJson);
    }
    
    return Task.FromResult(SubscriberClient.Reply.Ack);
});
// ...

重启整个系统并再次发送请求。现在去Jaeger UI查看,奇迹发生了:你会看到一条完整的链路图。它从Producer.API的HTTP请求开始,经过数据库操作,然后是一个PubSub:Publish Span,紧接着是Consumer.WorkerProcessOrderMessage Span。它们通过正确的父子关系连接在一起,形成了一个完整的业务流程视图。我们成功地缝合了异步边界。

第四阶段:集成Sentry,完成闭环

最后一步是验证Sentry的集成。Sentry的UseOpenTelemetry()扩展非常强大,它会自动捕获由OpenTelemetry创建的Span,并将它们作为性能事务(Transaction)发送到Sentry。更重要的是,当一个异常发生时,Sentry SDK会捕获当前的Activity.Current.Context,也就是当前的Span信息(Trace ID, Span ID),并将其附加到错误事件上。

我们在消费者中模拟一个处理失败的场景:

// Modify Worker.cs
private void ProcessOrder(string orderJson)
{
    using var activity = _activitySource.StartActivity("SimulateProcessing");
    try
    {
        _logger.LogInformation("Processing order: {order}", orderJson);
        Thread.Sleep(100);

        // 模拟一个可预测的错误
        if (orderJson.Contains("fail"))
        {
            throw new InvalidOperationException($"Failed to process order: {orderJson}");
        }
        _logger.LogInformation("Order processed successfully.");
    }
    catch (Exception ex)
    {
        // Sentry SDK 会自动捕获未处理的异常
        // 我们也可以手动捕获并记录,Sentry 仍能关联上下文
        _logger.LogError(ex, "An error occurred during order processing.");
        activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
        // 重抛异常,让 Sentry SDK 捕获
        throw;
    }
}

现在,发送一个包含"product": "fail"的请求。

  1. 在 Jaeger 中: 你会看到完整的链路,并且ProcessOrderMessageSimulateProcessing这个Span会被标记为错误状态。
  2. 在 Sentry 中: 你会看到一个新的InvalidOperationException问题。打开它,神奇之处在于:
    • 在事件详情中,你会看到Trace IDSpan ID
    • 有一个“View in Jaeger”或类似的链接(取决于Sentry的集成配置),点击它可以直接跳转到Jaeger UI中对应的、完整的分布式链路。
    • 在Sentry的”Performance”页面,你也能看到这个完整的事务,从API入口一直到消费者失败。

我们已经完成了端到端的统一可观测性管道。

sequenceDiagram
    participant Client
    participant Producer.API
    participant SQL Server
    participant Google Cloud Pub/Sub
    participant Consumer.Worker
    participant Jaeger
    participant Sentry

    Client->>+Producer.API: POST /orders
    Producer.API->>Producer.API: Start Trace (TraceID: T1)
    Producer.API->>+SQL Server: INSERT INTO Orders
    SQL Server-->>-Producer.API: Success
    Producer.API->>Producer.API: Get Current TraceContext (T1)
    Producer.API->>Google Cloud Pub/Sub: Publish(Message + TraceContext)
    Producer.API-->>-Client: 202 Accepted

    Note right of Google Cloud Pub/Sub: Message with T1 waits in topic

    Consumer.Worker->>+Google Cloud Pub/Sub: Pull Message
    Google Cloud Pub/Sub-->>-Consumer.Worker: Return Message with T1
    Consumer.Worker->>Consumer.Worker: Extract TraceContext (T1)
    Consumer.Worker->>Consumer.Worker: Start Child Span from T1
    Consumer.Worker->>Consumer.Worker: ProcessOrder() -> throws Exception

    Consumer.Worker->>Sentry: Report Exception with TraceID T1
    
    Note over Producer.API, Consumer.Worker: All spans are sent to Jaeger
    Producer.API->>Jaeger: Send Spans
    Consumer.Worker->>Jaeger: Send Spans

方案的局限性与未来迭代

这个手动注入和提取上下文的方案虽然有效,但在生产环境中存在维护成本。一个常见的错误是,开发人员在添加新的消息发布或消费逻辑时,忘记调用TraceContextPropagator,这将导致链路再次断裂。理想的长期解决方案是等待或贡献官方的OpenTelemetry.Instrumentation.GoogleCloudPubSub库,使其能像SqlClientAspNetCore一样自动完成上下文传播。

此外,本示例中为了清晰起见,将采样率设为100% (TracesSampleRate = 1.0)。在流量大的生产系统中,这会带来巨大的性能和成本开销。必须引入更智能的采样策略,例如基于头部的采样(Head-based Sampling)或更高级的尾部采样(Tail-based Sampling),后者允许系统在链路完整结束后再决定是否保留这条链路,对于定位偶发错误尤其有效。

最后,我们只传递了traceparent。OpenTelemetry的Baggage API允许在追踪上下文中携带业务数据(如userIdtenantId),这些数据会自动传播到下游所有服务。这对于在Sentry错误或Jaeger链路中快速定位到具体用户或租户的问题非常有价值,是下一步优化的重要方向。


  目录