利用 Nginx 与 ActiveMQ 为 Go Lambda 构建解耦的长轮询网关


项目初期,我们选择 AWS Lambda 处理一个计算密集型的异步任务。客户端提交任务后,需要通过一个接口轮询任务状态,直到获取最终结果。为了降低客户端的轮询频率并提升实时性,我们采用了长轮询(Long-Polling)机制。最初的实现非常直观,一个 Go 编写的 Lambda 函数接收任务 ID,然后在一个循环里查询数据库,如果任务未完成,则 time.Sleep 一段时间后重试,直到超时或任务完成。

// WARNING: 这是一个反面教材,直接在Lambda中长时间等待是极其昂贵的。
package main

import (
	"context"
	"fmt"
	"time"
	"github.com/aws/aws-lambda-go/lambda"
)

type TaskRequest struct {
	TaskID string `json:"taskId"`
}

type TaskResponse struct {
	Status  string      `json:"status"`
	Result  interface{} `json:"result,omitempty"`
	Message string      `json:"message,omitempty"`
}

// 模拟的数据库查询
func queryTaskStatusFromDB(taskID string) (string, interface{}) {
	// 在真实场景中,这里会查询DynamoDB或RDS
	// 为了演示,我们随机返回状态
	if time.Now().Second()%10 > 2 {
		return "PROCESSING", nil
	}
	return "COMPLETED", map[string]string{"data": "some_heavy_result"}
}

func HandleRequest(ctx context.Context, request TaskRequest) (TaskResponse, error) {
	// 设置一个30秒的超时上下文
	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()

	for {
		select {
		case <-ctx.Done():
			// Lambda执行超时或客户端断开
			return TaskResponse{Status: "TIMEOUT", Message: "Polling timed out after 30 seconds."}, nil
		default:
			status, result := queryTaskStatusFromDB(request.TaskID)
			if status == "COMPLETED" {
				return TaskResponse{Status: status, Result: result}, nil
			}
			// 任务未完成,休眠2秒继续轮询
			// 这是成本陷阱的核心:Lambda在Sleep时依然在计费
			time.Sleep(2 * time.Second)
		}
	}
}

func main() {
	lambda.Start(HandleRequest)
}

这个方案在测试环境运行良好,但上线后成本急剧飙升。AWS Lambda 的计费模型是按执行时长(每毫秒)和内存分配计费的。time.Sleep 期间,函数实例虽然空闲,但并未终止,计费仍在继续。一个30秒的长轮询请求,即使CPU几乎零消耗,也会按30秒的满额时长计费。当并发请求增多时,这很快演变成一场财务灾难。

这个痛点迫使我们重新审视架构。问题的核心在于,Lambda 的核心价值是处理短暂、无状态的事件,而不适合维持长连接或执行长时间的等待。我们需要将“等待”这个动作从昂贵的 Lambda 计算资源中剥离出去。

架构重构:引入 Nginx 和 ActiveMQ

初步构想是引入一个轻量级的代理层,专门负责处理客户端的长轮询连接。这个代理层必须能以极低的资源消耗维持大量并发连接。Nginx 因其事件驱动的异步非阻塞架构,成为处理 C10k 问题的首选。

同时,处理任务的 Lambda Worker 与返回结果的代理层需要一种通信机制。直接的 HTTP 调用会使代理层与 Worker 强耦合。一个更稳健的方案是使用消息队列。当 Worker 完成任务后,将结果发布到队列中,代理层作为消费者订阅该消息,并将结果推送给对应的客户端。我们选择了 ActiveMQ,因为它是一个成熟、稳定且支持多种协议(如 STOMP, AMQP)的开源消息代理。

最终的架构演变为:

  1. 客户端: 发起长轮询请求到 Nginx 网关。
  2. Nginx 网关: 运行在 EC2 或 ECS 上,接收请求并维持连接。它不直接处理业务逻辑。
  3. Go Bridge Service: 一个与 Nginx 部署在一起的轻量级 Go 服务。Nginx 将请求反向代理到此服务。它负责管理等待的请求,并订阅 ActiveMQ 获取任务结果。
  4. ActiveMQ: 消息中间件,用于 Lambda Worker 和 Go Bridge Service 之间的解耦通信。
  5. Go Lambda Worker: 触发后执行实际任务,完成后将结果(包含任务ID)发布到 ActiveMQ 的特定主题。

下面是这个架构的流程图:

sequenceDiagram
    participant Client
    participant Nginx
    participant GoBridge
    participant ActiveMQ
    participant GoLambda

    Client->>+Nginx: POST /poll/{task_id} (Long-Polling)
    Nginx->>+GoBridge: proxy_pass http://localhost:8081/poll/{task_id}
    GoBridge->>GoBridge: 记录 task_id 和 HTTP 请求,挂起连接

    Note over GoLambda: 任务异步触发...
    GoLambda->>GoLambda: 执行密集计算
    GoLambda->>+ActiveMQ: Publish(topic: "task_results", message: {task_id, result})
    ActiveMQ-->>-GoLambda: ACK

    ActiveMQ->>+GoBridge: Push message to subscriber
    GoBridge->>GoBridge: 收到消息,找到挂起的 {task_id} 请求
    GoBridge-->>-Nginx: HTTP 200 OK with result
    Nginx-->>-Client: HTTP 200 OK with result

核心组件实现

1. Go Lambda Worker

这个 Worker 现在只负责计算和发布消息,执行时间被压缩到最短。它不再包含任何等待逻辑。

// lambda/worker/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/aws/aws-lambda-go/lambda"
	"github.com/go-stomp/stomp/v3"
)

var (
	// 通过环境变量配置 ActiveMQ 连接信息
	mqHost = os.Getenv("ACTIVEMQ_HOST")
	mqPort = os.Getenv("ACTIVEMQ_PORT")
	mqUser = os.Getenv("ACTIVEMQ_USER")
	mqPass = os.Getenv("ACTIVEMQ_PASS")
	mqDest = "/topic/task_results" // 使用Topic进行发布/订阅
)

type TaskInput struct {
	TaskID   string `json:"taskId"`
	Payload  string `json:"payload"`
}

type TaskResult struct {
	TaskID string      `json:"taskId"`
	Status string      `json:"status"`
	Result interface{} `json:"result,omitempty"`
}

var stompConn *stomp.Conn

// 初始化 STOMP 连接
// 在真实项目中,应考虑连接失败的重试和更复杂的生命周期管理
func init() {
	var err error
	connStr := fmt.Sprintf("%s:%s", mqHost, mqPort)
	stompConn, err = stomp.Dial("tcp", connStr,
		stomp.ConnOpt.Login(mqUser, mqPass),
		stomp.ConnOpt.HeartBeat(30*time.Second, 30*time.Second), // 保持心跳
	)
	if err != nil {
		log.Fatalf("Failed to connect to ActiveMQ: %v", err)
	}
}

func HandleRequest(ctx context.Context, input TaskInput) (string, error) {
	log.Printf("Processing task: %s", input.TaskID)

	// 模拟耗时的计算任务
	time.Sleep(5 * time.Second)
	computationResult := fmt.Sprintf("Result for payload '%s'", input.Payload)

	result := TaskResult{
		TaskID: input.TaskID,
		Status: "COMPLETED",
		Result: map[string]string{"data": computationResult},
	}

	body, err := json.Marshal(result)
	if err != nil {
		log.Printf("ERROR - Failed to marshal result for task %s: %v", input.TaskID, err)
		return "Failed", err
	}

	// 将结果发布到 ActiveMQ
	err = stompConn.Send(
		mqDest,
		"application/json",
		body,
		stomp.SendOpt.Receipt, // 请求回执,确保消息发送成功
	)

	if err != nil {
		log.Printf("ERROR - Failed to publish result for task %s to ActiveMQ: %v", input.TaskID, err)
		return "Failed", err
	}

	log.Printf("Successfully published result for task: %s", input.TaskID)
	return "Success", nil
}

func main() {
    // defer stompConn.Disconnect() 并不适合Lambda的执行模型
    // 连接在init中建立,并被后续调用复用
	lambda.Start(HandleRequest)
}

这个 Lambda 的关键在于,连接在 init() 函数中建立,可以被同一个容器实例的后续调用复用,避免了每次调用都重新建立连接的开销。

2. Go Bridge Service

这是整个架构的核心粘合剂。它是一个小型的 HTTP 服务器,内部维护一个从 task_id 到等待中的 HTTP 请求的映射。它同时作为 ActiveMQ 的消费者。

// gateway/bridge/main.go
package main

import (
	"encoding/json"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/go-stomp/stomp/v3"
	"github.com/gorilla/mux"
)

// PendingRequest 存储挂起的HTTP请求和用于通知的channel
type PendingRequest struct {
	writer  http.ResponseWriter
	request *http.Request
	done    chan []byte // 用于传递结果的channel
}

// RequestManager 线程安全地管理所有挂起的请求
type RequestManager struct {
	sync.RWMutex
	pending map[string]*PendingRequest
}

func (rm *RequestManager) Add(taskID string, pr *PendingRequest) {
	rm.Lock()
	defer rm.Unlock()
	rm.pending[taskID] = pr
}

func (rm *RequestManager) GetAndRemove(taskID string) *PendingRequest {
	rm.Lock()
	defer rm.Unlock()
	if pr, ok := rm.pending[taskID]; ok {
		delete(rm.pending, taskID)
		return pr
	}
	return nil
}

var reqManager = RequestManager{
	pending: make(map[string]*PendingRequest),
}

// pollHandler 处理长轮询请求
func pollHandler(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	taskID := vars["taskId"]
	if taskID == "" {
		http.Error(w, "Task ID is required", http.StatusBadRequest)
		return
	}

	log.Printf("Received poll request for task: %s", taskID)
	
	// 设置一个比Nginx代理超时稍短的超时
	ctx := r.Context()
	ctx, cancel := context.WithTimeout(ctx, 55*time.Second)
	defer cancel()

	pr := &PendingRequest{
		writer:  w,
		request: r,
		done:    make(chan []byte, 1), // buffer为1,避免发送者阻塞
	}
	reqManager.Add(taskID, pr)

	select {
	case result := <-pr.done:
		log.Printf("Result found for task %s. Sending response.", taskID)
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(http.StatusOK)
		w.Write(result)
	case <-ctx.Done():
		log.Printf("Timeout for task %s", taskID)
		reqManager.GetAndRemove(taskID) // 超时后清理
		http.Error(w, `{"status":"TIMEOUT"}`, http.StatusRequestTimeout)
	case <-r.Context().Done():
		log.Printf("Client disconnected for task %s", taskID)
		reqManager.GetAndRemove(taskID) // 客户端断开后清理
	}
}

// listenToActiveMQ 订阅ActiveMQ并分发结果
func listenToActiveMQ() {
	mqHost := os.Getenv("ACTIVEMQ_HOST")
	mqPort := os.Getenv("ACTIVEMQ_PORT")
	mqUser := os.Getenv("ACTIVEMQ_USER")
	mqPass := os.Getenv("ACTIVEMQ_PASS")
	mqDest := "/topic/task_results"

	connStr := fmt.Sprintf("%s:%s", mqHost, mqPort)

	for { // 无限重连循环
		conn, err := stomp.Dial("tcp", connStr,
			stomp.ConnOpt.Login(mqUser, mqPass),
		)
		if err != nil {
			log.Printf("Cannot connect to ActiveMQ: %v. Retrying in 5 seconds...", err)
			time.Sleep(5 * time.Second)
			continue
		}

		sub, err := conn.Subscribe(mqDest, stomp.AckClientIndividual)
		if err != nil {
			log.Printf("Cannot subscribe to %s: %v. Retrying...", mqDest, err)
			conn.Disconnect()
			time.Sleep(5 * time.Second)
			continue
		}

		log.Println("Successfully connected and subscribed to ActiveMQ.")
		
		for msg := range sub.C {
			if msg.Err != nil {
				log.Printf("Received an error from subscription: %v", msg.Err)
				break // 跳出内层循环以触发重连
			}

			var result struct {
				TaskID string `json:"taskId"`
			}
			if err := json.Unmarshal(msg.Body, &result); err != nil {
				log.Printf("Failed to unmarshal message body: %v", err)
				conn.Nack(msg)
				continue
			}

			log.Printf("Received result for task: %s", result.TaskID)
			
			if pr := reqManager.GetAndRemove(result.TaskID); pr != nil {
				pr.done <- msg.Body
			} else {
				log.Printf("No pending request found for task: %s. Message might be ignored.", result.TaskID)
			}
			conn.Ack(msg)
		}
		log.Println("Subscription channel closed. Reconnecting...")
		conn.Disconnect()
	}
}

func main() {
	go listenToActiveMQ()

	r := mux.NewRouter()
	r.HandleFunc("/poll/{taskId}", pollHandler).Methods("POST")
	
	log.Println("Starting Go Bridge Service on :8081")
	if err := http.ListenAndServe(":8081", r); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

这里的关键点是 RequestManager,它使用 sync.RWMutex 来保证对 pending map 的并发访问安全。当 pollHandler 收到请求时,它创建一个带 done channel 的 PendingRequest 并将其存入 map。然后,请求的 goroutine 阻塞在 select 语句上。listenToActiveMQ 在一个独立的 goroutine 中运行,当它从 ActiveMQ 收到消息时,会根据 taskID 查找对应的 PendingRequest,并通过 done channel 将结果发送过去,从而唤醒被阻塞的 goroutine,完成响应。

3. Nginx 配置

Nginx 的配置相对简单,主要作用是反向代理和设置合理的超时。

# /etc/nginx/nginx.conf

worker_processes auto;

events {
    worker_connections 1024;
}

http {
    upstream go_bridge {
        server 127.0.0.1:8081;
        keepalive 16;
    }

    server {
        listen 80;
        server_name your_domain.com;

        # 为长轮询设置较长的超时时间
        proxy_connect_timeout 60s;
        proxy_send_timeout    60s;
        proxy_read_timeout    60s;
        send_timeout          60s;

        location /poll/ {
            proxy_pass http://go_bridge;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            
            # 开启HTTP/1.1并禁用缓冲以实现流式效果
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            proxy_buffering off;
            proxy_cache off;
        }
    }
}

这里的 proxy_read_timeout 至关重要,它定义了 Nginx 等待后端(Go Bridge)响应的最长时间。这个值必须大于客户端愿意等待的时间,但也要比 Go Bridge 内部的超时略长,以避免 Nginx 提前关闭连接。proxy_buffering off 也是一个好习惯,确保后端一有数据就立即转发给客户端。

4. CircleCI 自动化部署

最后,我们需要一个 CI/CD 流水线来自动化构建和部署所有组件。

# .circleci/config.yml
version: 2.1

orbs:
  aws-cli: circleci/aws-[email protected]
  aws-lambda: circleci/aws-[email protected]

executors:
  go-executor:
    docker:
      - image: cimg/go:1.20

jobs:
  build-and-deploy-lambda:
    executor: go-executor
    steps:
      - checkout
      - run:
          name: Build Go Lambda binary
          command: |
            cd lambda/worker
            GOOS=linux GOARCH=amd64 go build -o main main.go
            zip deployment.zip main
      - aws-lambda/deploy:
          function-name: my-async-worker-lambda
          zip-file: lambda/worker/deployment.zip
          aws-region: ${AWS_REGION}

  build-and-deploy-gateway:
    executor: go-executor
    steps:
      - checkout
      - aws-cli/setup
      - run:
          name: Build Go Bridge and Nginx Docker image
          command: |
            # 在这里添加构建Docker镜像的逻辑
            # 例如:
            # cd gateway
            # GOOS=linux GOARCH=amd64 go build -o bridge/bridge_server bridge/main.go
            # docker build -t my-gateway-image:${CIRCLE_SHA1} .
            # aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${AWS_ECR_REGISTRY}
            # docker push ${AWS_ECR_REGISTRY}/my-gateway-image:${CIRCLE_SHA1}
            echo "Building and pushing gateway image..."
      - run:
          name: Deploy to EC2/ECS
          command: |
            # 在这里添加部署到ECS或在EC2上更新服务的脚本
            # 例如,使用aws ecs update-service命令
            echo "Deploying gateway service..."

workflows:
  build-and-deploy-workflow:
    jobs:
      - build-and-deploy-lambda:
          context: aws-creds # 在CircleCI中配置名为aws-creds的上下文,包含AWS凭证
      - build-and-deploy-gateway:
          context: aws-creds
          requires:
            - build-and-deploy-lambda

这个 CircleCI 配置定义了两个并行的 Job。build-and-deploy-lambda 负责编译 Go Lambda 代码,打包成 zip,并使用 aws-lambda orb 进行部署。build-and-deploy-gateway 则负责构建包含 Nginx 和 Go Bridge 服务的 Docker 镜像,并将其推送到 ECR,最后触发 ECS 或 EC2 上的服务更新。在真实项目中,这部分会更加复杂,可能涉及基础设施即代码(Terraform/CloudFormation)的更新。

方案的局限性与未来展望

通过这套混合架构,我们成功地将长轮询的连接维持成本从昂贵的 Lambda 执行时间转移到了低成本的 Nginx 实例上。Lambda 现在只在需要时执行,极大地降低了运营成本。

然而,这个方案并非没有缺点。Go Bridge Service 成了有状态的单点。如果部署它的实例崩溃,所有正在等待的客户端连接都会丢失。要实现高可用,需要部署多个 Bridge 实例,并在它们之间同步或共享挂起请求的状态,例如通过 Redis Pub/Sub。当一个 Bridge 实例收到 ActiveMQ 的消息但发现对应的请求在另一个实例上时,它需要通过 Redis 将消息转发过去。

另一个可优化的方向是 Nginx 本身。使用 OpenResty (Nginx + Lua) 或者 Nginx Unit,有可能直接在 Nginx 的工作进程中实现与 ActiveMQ 的 STOMP 协议交互,从而省去独立的 Go Bridge Service,进一步简化架构。但这会增加 Nginx 配置的复杂性,并引入对 Lua 脚本或特定 Nginx 模块的依赖。技术选型总是在不同维度间的权衡,当前方案在清晰度和快速实现上具有优势,而未来的迭代可以向着更高的集成度和容错性演进。


  目录