基于 Azure 与 Google Cloud Functions 构建高基数 Prometheus 远程写代理的架构权衡


Prometheus 的 pull 模型在监控长期运行的服务时表现出色,但在处理 CI/CD job、Serverless 函数或批处理任务这类生命周期短暂的目标时,其局限性便显而易见。官方推荐的 Pushgateway 方案,在真实项目中往往会演变成一场灾难——由于缺乏对标签生命周期的管理,它极易成为高基数(high cardinality)指标的源头,最终拖垮整个 Prometheus TSDB。

根本问题在于,我们需要一个能够接收推送指标、但在写入 TSDB 之前执行智能预处理的中间层。这个中间层必须是无状态的、高可用的,并且能够根据流量弹性伸缩。Serverless Functions 似乎是这个场景的完美候选。本文将深入探讨并实现两种基于主流云厂商 Serverless 平台的方案:Azure Functions 和 Google Cloud Functions,并对它们在构建一个高基数感知的 Prometheus 远程写(Remote Write)代理时的架构进行权衡。

定义复杂技术问题:构建一个高基数感知的远程写代理

我们的目标是创建一个 HTTP 端点,它能接收 Prometheus 文本格式的指标,然后将其转换为 Prometheus Remote Write 协议格式,并在发送给 Prometheus TSDB 之前,执行关键的基数控制逻辑。

核心挑战:

  1. 协议转换: 将简单明了的文本格式转换为二进制、经过 Snappy 压缩的 Protobuf 格式。
  2. 基数控制: 实现一个可配置的守卫逻辑,防止带有高基数标签(如 request_id, user_email, container_id)的指标污染下游 TSDB。
  3. 性能与成本: Serverless 架构下,冷启动时间、执行时间、内存消耗直接关系到端点的响应延迟和最终成本。
  4. 多云选型: Azure 和 Google Cloud 在运行时环境、编程模型、性能特性上存在差异,哪一个更适合这种低延迟、计算密集型的转换任务?

我们将通过完整的代码实现来对比这两种方案。

graph TD
    subgraph Ephemeral Workloads
        A[CI/CD Job 1]
        B[Batch Process 2]
        C[IoT Device 3]
    end

    subgraph Serverless Ingestion Layer
        direction LR
        D{Serverless Function}
        D -- 1. Receive Prometheus Text Format --> E[Parse Metrics]
        E -- 2. Apply Cardinality Guard --> F[Filter/Transform Labels]
        F -- 3. Convert to Protobuf --> G[Serialize & Snappy Compress]
    end

    subgraph Monitoring Backend
        H[Prometheus TSDB]
    end

    A --> D
    B --> D
    C --> D
    G -- 4. POST Remote Write Payload --> H

方案 A:Azure Functions (C# 运行时)

Azure Functions 提供了一个成熟的 .NET 运行时环境,其与 Visual Studio 的深度集成和丰富的库生态系统是其主要优势。我们将使用一个 HTTP 触发的函数来完成此任务。

架构与设计考量

在 Azure 中,我们将部署一个 Function App,运行在消费计划(Consumption Plan)上,以实现按需付费和自动伸缩。函数将通过环境变量接收 Prometheus 远程写端点地址和基数控制规则。

一个常见的错误是直接在函数内处理原始文本,这效率低下且容易出错。在真实项目中,我们会引入成熟的 Prometheus 解析库。这里我们将手动实现一个简化的解析器,以更好地展示核心逻辑,但在生产环境中建议使用 prometheus-net 等库。

核心实现:CardinalityAwareProxy.cs

这是完整的 C# 函数代码,它包含了协议解析、基数守卫和远程写协议转换的全部逻辑。

// Filename: CardinalityAwareProxy.cs
// Required NuGet packages: Google.Protobuf, Snappy.Standard

using System;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Google.Protobuf;
using Prometheus; // This is a simplified representation of the Remote.proto classes

public static class CardinalityAwareProxy
{
    private static readonly HttpClient httpClient = new HttpClient();
    private const int MaxLabelsCount = 10; // 基数控制:标签数量上限
    // 生产环境中,这个列表应该来自配置
    private static readonly string[] DisallowedLabels = { "user_id", "request_id", "trace_id" };
    private static readonly string PrometheusRemoteWriteUrl = Environment.GetEnvironmentVariable("PROMETHEUS_REMOTE_WRITE_URL");

    [FunctionName("PrometheusIngest")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
        ILogger log)
    {
        if (string.IsNullOrEmpty(PrometheusRemoteWriteUrl))
        {
            log.LogError("PROMETHEUS_REMOTE_WRITE_URL environment variable is not set.");
            return new StatusCodeResult(StatusCodes.Status500InternalServerError);
        }

        string requestBody;
        using (var reader = new StreamReader(req.Body, Encoding.UTF8))
        {
            requestBody = await reader.ReadToEndAsync();
        }

        if (string.IsNullOrWhiteSpace(requestBody))
        {
            return new BadRequestObjectResult("Request body cannot be empty.");
        }

        var writeRequest = new WriteRequest();
        var lines = requestBody.Split(new[] { '\n' }, StringSplitOptions.RemoveEmptyEntries);
        var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();

        foreach (var line in lines)
        {
            if (line.StartsWith("#")) continue; // Skip comments

            try
            {
                var timeSeries = ParsePrometheusTextLine(line, timestamp, log);
                if (timeSeries != null)
                {
                    writeRequest.Timeseries.Add(timeSeries);
                }
            }
            catch (Exception ex)
            {
                log.LogWarning($"Skipping invalid metric line: '{line}'. Reason: {ex.Message}");
            }
        }
        
        if (writeRequest.Timeseries.Count == 0)
        {
            return new OkObjectResult("No valid metrics processed.");
        }

        try
        {
            await SendToPrometheus(writeRequest, log);
            return new OkResult();
        }
        catch (HttpRequestException ex)
        {
            log.LogError(ex, $"Failed to send data to Prometheus Remote Write endpoint. Status: {ex.StatusCode}");
            return new StatusCodeResult(StatusCodes.Status502BadGateway);
        }
        catch (Exception ex)
        {
            log.LogError(ex, "An unexpected error occurred while sending data to Prometheus.");
            return new StatusCodeResult(StatusCodes.Status500InternalServerError);
        }
    }

    private static TimeSeries ParsePrometheusTextLine(string line, long timestamp, ILogger log)
    {
        // This is a simplified parser. Production code should be more robust.
        string metricName;
        var labels = new RepeatedField<Label>();
        double value;

        int labelStartIndex = line.IndexOf('{');
        int labelEndIndex = line.IndexOf('}');
        int valueSeparatorIndex = line.LastIndexOf(' ');

        if (valueSeparatorIndex == -1) throw new ArgumentException("Invalid format: missing value");
        if (!double.TryParse(line.Substring(valueSeparatorIndex + 1), out value))
        {
            throw new ArgumentException("Invalid format: value is not a double");
        }

        if (labelStartIndex > 0 && labelEndIndex > labelStartIndex)
        {
            metricName = line.Substring(0, labelStartIndex);
            string labelPart = line.Substring(labelStartIndex + 1, labelEndIndex - labelStartIndex - 1);
            
            labels.Add(new Label { Name = "__name__", Value = metricName });
            
            var labelPairs = labelPart.Split(',');
            foreach (var pair in labelPairs)
            {
                var parts = pair.Split('=');
                if (parts.Length != 2) continue;
                labels.Add(new Label { Name = parts[0].Trim(), Value = parts[1].Trim().Trim('"') });
            }
        }
        else
        {
            metricName = line.Substring(0, valueSeparatorIndex);
            labels.Add(new Label { Name = "__name__", Value = metricName });
        }

        // --- Cardinality Guard Logic ---
        if (labels.Count > MaxLabelsCount)
        {
            log.LogWarning($"Metric '{metricName}' dropped due to exceeding max label count ({labels.Count} > {MaxLabelsCount}).");
            return null;
        }

        foreach (var label in labels)
        {
            if (DisallowedLabels.Contains(label.Name, StringComparer.OrdinalIgnoreCase))
            {
                log.LogWarning($"Metric '{metricName}' dropped due to containing disallowed label '{label.Name}'.");
                return null;
            }
        }
        // --- End of Cardinality Guard ---

        var timeSeries = new TimeSeries();
        timeSeries.Labels.AddRange(labels);
        timeSeries.Samples.Add(new Sample { Value = value, Timestamp = timestamp });
        
        return timeSeries;
    }

    private static async Task SendToPrometheus(WriteRequest writeRequest, ILogger log)
    {
        var protoBytes = writeRequest.ToByteArray();
        var compressedBytes = Snappy.Standard.SnappyCodec.Compress(protoBytes);

        var request = new HttpRequestMessage(HttpMethod.Post, PrometheusRemoteWriteUrl)
        {
            Content = new ByteArrayContent(compressedBytes)
        };
        request.Headers.Add("X-Prometheus-Remote-Write-Version", "0.1.0");
        request.Content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/x-protobuf");
        request.Content.Headers.ContentEncoding.Add("snappy");
        
        var response = await httpClient.SendAsync(request);
        response.EnsureSuccessStatusCode();
    }
}


// Simplified Protobuf generated classes for context
// In a real project, these would be generated from remote.proto
namespace Prometheus {
    using Google.Protobuf.Collections;
    using Google.Protobuf;

    public sealed partial class WriteRequest : IMessage<WriteRequest> { /* ... generated code ... */ public RepeatedField<TimeSeries> Timeseries { get; } = new RepeatedField<TimeSeries>(); }
    public sealed partial class TimeSeries : IMessage<TimeSeries> { /* ... generated code ... */ public RepeatedField<Label> Labels { get; } = new RepeatedField<Label>(); public RepeatedField<Sample> Samples { get; } = new RepeatedField<Sample>(); }
    public sealed partial class Label : IMessage<Label> { /* ... generated code ... */ public string Name { get; set; } public string Value { get; set; } }
    public sealed partial class Sample : IMessage<Sample> { /* ... generated code ... */ public double Value { get; set; } public long Timestamp { get; set; } }
}

配置 local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "PROMETHEUS_REMOTE_WRITE_URL": "http://your-prometheus-instance:9090/api/v1/write"
  }
}

方案 A 优劣分析

  • 优点:
    • 开发体验: 对于 .NET 开发者,使用 Visual Studio 或 VS Code 可以获得一流的开发、调试和部署体验。
    • 生态系统: .NET 生态成熟,有大量高质量的库可供使用(如 prometheus-net 用于更可靠的解析)。
    • 类型安全: C# 的静态类型系统有助于在编译时捕获错误,提升代码健壮性。
  • 缺点:
    • 冷启动: .NET 运行时在消费计划下的冷启动时间相对较长(通常在几百毫秒到几秒之间)。对于 metrics 这种需要低延迟的场景,这可能成为一个问题。
    • 性能开销: .NET JIT 编译和垃圾回收机制虽然高效,但在短暂的函数执行生命周期中,可能引入不可预测的延迟。

方案 B:Google Cloud Functions (Go 运行时)

Go 语言因其出色的并发性能、快速的编译速度和极低的内存占用,成为云原生和网络服务开发的首选。Google Cloud Functions 对 Go 有着一等公民的支持。

架构与设计考量

我们将部署一个 HTTP 触发的 Cloud Function (2nd gen) 以获得更好的性能和更少的冷启动影响。Go 语言可以直接使用 Prometheus 官方维护的 prometheus/prometheus 仓库中的 prompb 包,这确保了与远程写协议的 100% 兼容性。

这里的代码会显得更“原生”,因为它直接利用了 Prometheus 生态系统中的工具。

核心实现:function.go

这个 Go 文件实现了与 C# 版本完全相同的功能。

// Filename: function.go
// Command to initialize module: go mod init example.com/gcp-prom-proxy
// Command to get dependencies: go get github.com/gogo/protobuf/proto github.com/prometheus/prometheus/prompb github.com/prometheus/common/model github.com/prometheus/common/expfmt github.com/golang/snappy

package function

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/gogo/protobuf/proto"
	"github.com/golang/snappy"
	"github.com/prometheus/common/expfmt"
	"github.com/prometheus/common/model"
	"github.com/prometheus/prometheus/prompb"
)

var (
	httpClient           = &http.Client{Timeout: 10 * time.Second}
	remoteWriteURL       string
	maxLabelsCount       = 10
	disallowedLabels     = map[string]struct{}{"user_id": {}, "request_id": {}, "trace_id": {}}
)

// init runs when the function instance is initialized.
func init() {
	remoteWriteURL = os.Getenv("PROMETHEUS_REMOTE_WRITE_URL")
	if remoteWriteURL == "" {
		log.Fatal("PROMETHEUS_REMOTE_WRITE_URL environment variable is not set.")
	}
	// In production, load maxLabelsCount and disallowedLabels from a config source.
}

// PrometheusIngest is the entry point for the Google Cloud Function.
func PrometheusIngest(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is accepted", http.StatusMethodNotAllowed)
		return
	}

	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "Error reading request body", http.StatusInternalServerError)
		return
	}
	defer r.Body.Close()

	// Use Prometheus's own text format parser.
	var parser expfmt.TextParser
	metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(body))
	if err != nil {
		log.Printf("Error parsing text exposition format: %v", err)
		http.Error(w, "Error parsing metrics", http.StatusBadRequest)
		return
	}

	writeRequest, numProcessed := buildWriteRequest(metricFamilies)

	if numProcessed == 0 {
		w.WriteHeader(http.StatusOK)
		fmt.Fprint(w, "No valid metrics processed.")
		return
	}
	
	err = sendToPrometheus(context.Background(), writeRequest)
	if err != nil {
		log.Printf("Error sending to Prometheus remote write: %v", err)
		http.Error(w, "Failed to forward metrics", http.StatusBadGateway)
		return
	}
	
	w.WriteHeader(http.StatusOK)
}

func buildWriteRequest(metricFamilies map[string]*model.MetricFamily) (*prompb.WriteRequest, int) {
	writeRequest := &prompb.WriteRequest{}
	ts := time.Now().UnixMilli()
	numProcessed := 0

	for _, mf := range metricFamilies {
		for _, m := range mf.Metric {
			// --- Cardinality Guard Logic ---
			if len(m) > maxLabelsCount {
				log.Printf("Metric '%s' dropped due to exceeding max label count (%d > %d)", mf.Name, len(m), maxLabelsCount)
				continue
			}
			
			hasDisallowedLabel := false
			labels := make([]prompb.Label, 0, len(m)+1)
			labels = append(labels, prompb.Label{Name: "__name__", Value: mf.Name})

			for name, value := range m {
				if _, found := disallowedLabels[string(name)]; found {
					log.Printf("Metric '%s' dropped due to containing disallowed label '%s'", mf.Name, name)
					hasDisallowedLabel = true
					break
				}
				labels = append(labels, prompb.Label{Name: string(name), Value: string(value)})
			}

			if hasDisallowedLabel {
				continue
			}
			// --- End of Cardinality Guard ---

			var value float64
			// This handles different metric types from the parser
			switch mf.Type {
			case model.MetricType_COUNTER:
				value = float64(m.Counter.Value)
			case model.MetricType_GAUGE:
				value = float64(m.Gauge.Value)
			case model.MetricType_UNTYPED:
				value = float64(m.Untyped.Value)
			default:
				// Skip unsupported types like Histogram and Summary for simplicity
				continue
			}
			
			tsProto := prompb.TimeSeries{
				Labels:  labels,
				Samples: []prompb.Sample{{Value: value, Timestamp: ts}},
			}
			writeRequest.Timeseries = append(writeRequest.Timeseries, tsProto)
			numProcessed++
		}
	}
	return writeRequest, numProcessed
}

func sendToPrometheus(ctx context.Context, req *prompb.WriteRequest) error {
	data, err := proto.Marshal(req)
	if err != nil {
		return fmt.Errorf("failed to marshal proto: %w", err)
	}

	compressed := snappy.Encode(nil, data)

	httpReq, err := http.NewRequestWithContext(ctx, "POST", remoteWriteURL, bytes.NewReader(compressed))
	if err != nil {
		return fmt.Errorf("failed to create request: %w", err)
	}

	httpReq.Header.Add("Content-Encoding", "snappy")
	httpReq.Header.Set("Content-Type", "application/x-protobuf")
	httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
	// In production, add authentication headers if needed.

	resp, err := httpClient.Do(httpReq)
	if err != nil {
		return fmt.Errorf("http request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode/100 != 2 {
		body, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
		return fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(body))
	}

	return nil
}

部署与配置

部署 GCF 函数通常通过 gcloud CLI 完成,环境变量可以在部署时设置。

gcloud functions deploy PrometheusIngest \
--gen2 \
--runtime=go121 \
--region=us-central1 \
--source=. \
--entry-point=PrometheusIngest \
--trigger-http \
--allow-unauthenticated \
--set-env-vars PROMETHEUS_REMOTE_WRITE_URL="http://your-prometheus-instance:9090/api/v1/write"

方案 B 优劣分析

  • 优点:
    • 极致性能: Go 编译为原生机器码,无 JIT 开销,启动速度极快。对于这种 short-lived 的计算任务,性能优势非常明显。GC 停顿也远比 .NET Framework/.NET Core 的 GC 更可控和短暂。
    • 低资源消耗: Go 应用的内存占用通常非常小,这在 Serverless 按资源计费的模型中意味着更低的成本。
    • 原生生态: 直接使用 Prometheus 官方库,避免了协议实现上的偏差,代码更简洁、更可靠。
  • 缺点:
    • 开发工具: 虽然 Go 的工具链非常出色,但对于习惯了 Visual Studio 这种重型 IDE 的开发者来说,可能需要一个适应过程。
    • 强类型但更底层: Go 的错误处理模式(if err != nil)虽然明确,但相比 C# 的异常处理会显得更繁琐。

最终选择与理由

在对两种方案进行深度实现和分析后,对于构建一个高性能、低成本的高基数感知的 Prometheus 远程写代理这一特定任务,**我最终选择方案 B:Google Cloud Functions (Go 运行时)**。

做出这个决策的核心理由如下:

  1. 冷启动性能是关键: 指标采集代理的延迟至关重要。任何显著的延迟都可能导致 ephemeral jobs 在发送指标后立即退出,而代理函数尚未完全启动,从而丢失数据。Go 在 GCF Gen2 上的 P99 冷启动时间显著优于 .NET 在 Azure Functions 消费计划上的表现。
  2. 资源效率等于成本效益: 该函数会被高频调用。Go 的低内存和 CPU 占用意味着在相同的负载下,每个函数实例消耗的 GB-secondsvCPU-seconds 更少,直接转化为更低的运营成本。
  3. 协议一致性: 使用与 Prometheus 本身相同的 Go 库来处理 prompb 协议,从根本上消除了因第三方库实现差异而可能引入的微妙 bug。在处理基础监控设施时,这种确定性是极为宝贵的。

虽然 Azure Functions 和 C# 的开发体验可能对某些团队更友好,但在性能和成本这两个对基础设施组件至关重要的维度上,Go on GCF 的组合优势无法忽视。

架构的扩展性与局限性

当前这个 Serverless 代理方案解决了一个核心痛点,但它并非银弹。在真实生产环境中,还需要考虑以下几点:

  • 交付保证: 当前实现是“至多一次”(at-most-once)的交付。如果函数执行失败或超时,指标会丢失。要实现“至少一次”(at-least-once)交付,需要在函数前引入一个持久化队列,如 Google Pub/Sub 或 Azure Service Bus。函数由消息触发,处理失败后消息可以被重试。但这会增加架构复杂性和延迟。
  • 动态配置: 基数控制规则(disallowedLabels)是硬编码的。一个更健壮的系统应该从配置服务(如 Google Secret Manager, Azure App Configuration)动态加载这些规则,以便在不重新部署函数的情况下更新策略。
  • 认证与授权: 生产环境的端点必须受到保护。可以通过 API Gateway(如 Apigee 或 Azure API Management)或者在函数本身中实现基于 token 或 mTLS 的认证。
  • 成本临界点: Serverless 的优势在于弹性,但当流量持续处于非常高的水平时,其成本可能会超过一个(或一组)持续运行的专用虚拟机或容器(例如在 Google Cloud Run 或 Azure Container Apps 上运行相同的 Go 应用)。需要对成本进行建模,以确定何时从 Functions 迁移到更适合持续高负载的平台是合适的。这个代理的理想工作场景是处理突发性或不可预测的流量。

  目录