项目初期,我们选择 AWS Lambda 处理一个计算密集型的异步任务。客户端提交任务后,需要通过一个接口轮询任务状态,直到获取最终结果。为了降低客户端的轮询频率并提升实时性,我们采用了长轮询(Long-Polling)机制。最初的实现非常直观,一个 Go 编写的 Lambda 函数接收任务 ID,然后在一个循环里查询数据库,如果任务未完成,则 time.Sleep
一段时间后重试,直到超时或任务完成。
// WARNING: 这是一个反面教材,直接在Lambda中长时间等待是极其昂贵的。
package main
import (
"context"
"fmt"
"time"
"github.com/aws/aws-lambda-go/lambda"
)
type TaskRequest struct {
TaskID string `json:"taskId"`
}
type TaskResponse struct {
Status string `json:"status"`
Result interface{} `json:"result,omitempty"`
Message string `json:"message,omitempty"`
}
// 模拟的数据库查询
func queryTaskStatusFromDB(taskID string) (string, interface{}) {
// 在真实场景中,这里会查询DynamoDB或RDS
// 为了演示,我们随机返回状态
if time.Now().Second()%10 > 2 {
return "PROCESSING", nil
}
return "COMPLETED", map[string]string{"data": "some_heavy_result"}
}
func HandleRequest(ctx context.Context, request TaskRequest) (TaskResponse, error) {
// 设置一个30秒的超时上下文
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
// Lambda执行超时或客户端断开
return TaskResponse{Status: "TIMEOUT", Message: "Polling timed out after 30 seconds."}, nil
default:
status, result := queryTaskStatusFromDB(request.TaskID)
if status == "COMPLETED" {
return TaskResponse{Status: status, Result: result}, nil
}
// 任务未完成,休眠2秒继续轮询
// 这是成本陷阱的核心:Lambda在Sleep时依然在计费
time.Sleep(2 * time.Second)
}
}
}
func main() {
lambda.Start(HandleRequest)
}
这个方案在测试环境运行良好,但上线后成本急剧飙升。AWS Lambda 的计费模型是按执行时长(每毫秒)和内存分配计费的。time.Sleep
期间,函数实例虽然空闲,但并未终止,计费仍在继续。一个30秒的长轮询请求,即使CPU几乎零消耗,也会按30秒的满额时长计费。当并发请求增多时,这很快演变成一场财务灾难。
这个痛点迫使我们重新审视架构。问题的核心在于,Lambda 的核心价值是处理短暂、无状态的事件,而不适合维持长连接或执行长时间的等待。我们需要将“等待”这个动作从昂贵的 Lambda 计算资源中剥离出去。
架构重构:引入 Nginx 和 ActiveMQ
初步构想是引入一个轻量级的代理层,专门负责处理客户端的长轮询连接。这个代理层必须能以极低的资源消耗维持大量并发连接。Nginx 因其事件驱动的异步非阻塞架构,成为处理 C10k 问题的首选。
同时,处理任务的 Lambda Worker 与返回结果的代理层需要一种通信机制。直接的 HTTP 调用会使代理层与 Worker 强耦合。一个更稳健的方案是使用消息队列。当 Worker 完成任务后,将结果发布到队列中,代理层作为消费者订阅该消息,并将结果推送给对应的客户端。我们选择了 ActiveMQ,因为它是一个成熟、稳定且支持多种协议(如 STOMP, AMQP)的开源消息代理。
最终的架构演变为:
- 客户端: 发起长轮询请求到 Nginx 网关。
- Nginx 网关: 运行在 EC2 或 ECS 上,接收请求并维持连接。它不直接处理业务逻辑。
- Go Bridge Service: 一个与 Nginx 部署在一起的轻量级 Go 服务。Nginx 将请求反向代理到此服务。它负责管理等待的请求,并订阅 ActiveMQ 获取任务结果。
- ActiveMQ: 消息中间件,用于 Lambda Worker 和 Go Bridge Service 之间的解耦通信。
- Go Lambda Worker: 触发后执行实际任务,完成后将结果(包含任务ID)发布到 ActiveMQ 的特定主题。
下面是这个架构的流程图:
sequenceDiagram participant Client participant Nginx participant GoBridge participant ActiveMQ participant GoLambda Client->>+Nginx: POST /poll/{task_id} (Long-Polling) Nginx->>+GoBridge: proxy_pass http://localhost:8081/poll/{task_id} GoBridge->>GoBridge: 记录 task_id 和 HTTP 请求,挂起连接 Note over GoLambda: 任务异步触发... GoLambda->>GoLambda: 执行密集计算 GoLambda->>+ActiveMQ: Publish(topic: "task_results", message: {task_id, result}) ActiveMQ-->>-GoLambda: ACK ActiveMQ->>+GoBridge: Push message to subscriber GoBridge->>GoBridge: 收到消息,找到挂起的 {task_id} 请求 GoBridge-->>-Nginx: HTTP 200 OK with result Nginx-->>-Client: HTTP 200 OK with result
核心组件实现
1. Go Lambda Worker
这个 Worker 现在只负责计算和发布消息,执行时间被压缩到最短。它不再包含任何等待逻辑。
// lambda/worker/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/aws/aws-lambda-go/lambda"
"github.com/go-stomp/stomp/v3"
)
var (
// 通过环境变量配置 ActiveMQ 连接信息
mqHost = os.Getenv("ACTIVEMQ_HOST")
mqPort = os.Getenv("ACTIVEMQ_PORT")
mqUser = os.Getenv("ACTIVEMQ_USER")
mqPass = os.Getenv("ACTIVEMQ_PASS")
mqDest = "/topic/task_results" // 使用Topic进行发布/订阅
)
type TaskInput struct {
TaskID string `json:"taskId"`
Payload string `json:"payload"`
}
type TaskResult struct {
TaskID string `json:"taskId"`
Status string `json:"status"`
Result interface{} `json:"result,omitempty"`
}
var stompConn *stomp.Conn
// 初始化 STOMP 连接
// 在真实项目中,应考虑连接失败的重试和更复杂的生命周期管理
func init() {
var err error
connStr := fmt.Sprintf("%s:%s", mqHost, mqPort)
stompConn, err = stomp.Dial("tcp", connStr,
stomp.ConnOpt.Login(mqUser, mqPass),
stomp.ConnOpt.HeartBeat(30*time.Second, 30*time.Second), // 保持心跳
)
if err != nil {
log.Fatalf("Failed to connect to ActiveMQ: %v", err)
}
}
func HandleRequest(ctx context.Context, input TaskInput) (string, error) {
log.Printf("Processing task: %s", input.TaskID)
// 模拟耗时的计算任务
time.Sleep(5 * time.Second)
computationResult := fmt.Sprintf("Result for payload '%s'", input.Payload)
result := TaskResult{
TaskID: input.TaskID,
Status: "COMPLETED",
Result: map[string]string{"data": computationResult},
}
body, err := json.Marshal(result)
if err != nil {
log.Printf("ERROR - Failed to marshal result for task %s: %v", input.TaskID, err)
return "Failed", err
}
// 将结果发布到 ActiveMQ
err = stompConn.Send(
mqDest,
"application/json",
body,
stomp.SendOpt.Receipt, // 请求回执,确保消息发送成功
)
if err != nil {
log.Printf("ERROR - Failed to publish result for task %s to ActiveMQ: %v", input.TaskID, err)
return "Failed", err
}
log.Printf("Successfully published result for task: %s", input.TaskID)
return "Success", nil
}
func main() {
// defer stompConn.Disconnect() 并不适合Lambda的执行模型
// 连接在init中建立,并被后续调用复用
lambda.Start(HandleRequest)
}
这个 Lambda 的关键在于,连接在 init()
函数中建立,可以被同一个容器实例的后续调用复用,避免了每次调用都重新建立连接的开销。
2. Go Bridge Service
这是整个架构的核心粘合剂。它是一个小型的 HTTP 服务器,内部维护一个从 task_id
到等待中的 HTTP 请求的映射。它同时作为 ActiveMQ 的消费者。
// gateway/bridge/main.go
package main
import (
"encoding/json"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/go-stomp/stomp/v3"
"github.com/gorilla/mux"
)
// PendingRequest 存储挂起的HTTP请求和用于通知的channel
type PendingRequest struct {
writer http.ResponseWriter
request *http.Request
done chan []byte // 用于传递结果的channel
}
// RequestManager 线程安全地管理所有挂起的请求
type RequestManager struct {
sync.RWMutex
pending map[string]*PendingRequest
}
func (rm *RequestManager) Add(taskID string, pr *PendingRequest) {
rm.Lock()
defer rm.Unlock()
rm.pending[taskID] = pr
}
func (rm *RequestManager) GetAndRemove(taskID string) *PendingRequest {
rm.Lock()
defer rm.Unlock()
if pr, ok := rm.pending[taskID]; ok {
delete(rm.pending, taskID)
return pr
}
return nil
}
var reqManager = RequestManager{
pending: make(map[string]*PendingRequest),
}
// pollHandler 处理长轮询请求
func pollHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
taskID := vars["taskId"]
if taskID == "" {
http.Error(w, "Task ID is required", http.StatusBadRequest)
return
}
log.Printf("Received poll request for task: %s", taskID)
// 设置一个比Nginx代理超时稍短的超时
ctx := r.Context()
ctx, cancel := context.WithTimeout(ctx, 55*time.Second)
defer cancel()
pr := &PendingRequest{
writer: w,
request: r,
done: make(chan []byte, 1), // buffer为1,避免发送者阻塞
}
reqManager.Add(taskID, pr)
select {
case result := <-pr.done:
log.Printf("Result found for task %s. Sending response.", taskID)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(result)
case <-ctx.Done():
log.Printf("Timeout for task %s", taskID)
reqManager.GetAndRemove(taskID) // 超时后清理
http.Error(w, `{"status":"TIMEOUT"}`, http.StatusRequestTimeout)
case <-r.Context().Done():
log.Printf("Client disconnected for task %s", taskID)
reqManager.GetAndRemove(taskID) // 客户端断开后清理
}
}
// listenToActiveMQ 订阅ActiveMQ并分发结果
func listenToActiveMQ() {
mqHost := os.Getenv("ACTIVEMQ_HOST")
mqPort := os.Getenv("ACTIVEMQ_PORT")
mqUser := os.Getenv("ACTIVEMQ_USER")
mqPass := os.Getenv("ACTIVEMQ_PASS")
mqDest := "/topic/task_results"
connStr := fmt.Sprintf("%s:%s", mqHost, mqPort)
for { // 无限重连循环
conn, err := stomp.Dial("tcp", connStr,
stomp.ConnOpt.Login(mqUser, mqPass),
)
if err != nil {
log.Printf("Cannot connect to ActiveMQ: %v. Retrying in 5 seconds...", err)
time.Sleep(5 * time.Second)
continue
}
sub, err := conn.Subscribe(mqDest, stomp.AckClientIndividual)
if err != nil {
log.Printf("Cannot subscribe to %s: %v. Retrying...", mqDest, err)
conn.Disconnect()
time.Sleep(5 * time.Second)
continue
}
log.Println("Successfully connected and subscribed to ActiveMQ.")
for msg := range sub.C {
if msg.Err != nil {
log.Printf("Received an error from subscription: %v", msg.Err)
break // 跳出内层循环以触发重连
}
var result struct {
TaskID string `json:"taskId"`
}
if err := json.Unmarshal(msg.Body, &result); err != nil {
log.Printf("Failed to unmarshal message body: %v", err)
conn.Nack(msg)
continue
}
log.Printf("Received result for task: %s", result.TaskID)
if pr := reqManager.GetAndRemove(result.TaskID); pr != nil {
pr.done <- msg.Body
} else {
log.Printf("No pending request found for task: %s. Message might be ignored.", result.TaskID)
}
conn.Ack(msg)
}
log.Println("Subscription channel closed. Reconnecting...")
conn.Disconnect()
}
}
func main() {
go listenToActiveMQ()
r := mux.NewRouter()
r.HandleFunc("/poll/{taskId}", pollHandler).Methods("POST")
log.Println("Starting Go Bridge Service on :8081")
if err := http.ListenAndServe(":8081", r); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
这里的关键点是 RequestManager
,它使用 sync.RWMutex
来保证对 pending
map 的并发访问安全。当 pollHandler
收到请求时,它创建一个带 done
channel 的 PendingRequest
并将其存入 map。然后,请求的 goroutine 阻塞在 select
语句上。listenToActiveMQ
在一个独立的 goroutine 中运行,当它从 ActiveMQ 收到消息时,会根据 taskID
查找对应的 PendingRequest
,并通过 done
channel 将结果发送过去,从而唤醒被阻塞的 goroutine,完成响应。
3. Nginx 配置
Nginx 的配置相对简单,主要作用是反向代理和设置合理的超时。
# /etc/nginx/nginx.conf
worker_processes auto;
events {
worker_connections 1024;
}
http {
upstream go_bridge {
server 127.0.0.1:8081;
keepalive 16;
}
server {
listen 80;
server_name your_domain.com;
# 为长轮询设置较长的超时时间
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
send_timeout 60s;
location /poll/ {
proxy_pass http://go_bridge;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 开启HTTP/1.1并禁用缓冲以实现流式效果
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
}
}
}
这里的 proxy_read_timeout
至关重要,它定义了 Nginx 等待后端(Go Bridge)响应的最长时间。这个值必须大于客户端愿意等待的时间,但也要比 Go Bridge 内部的超时略长,以避免 Nginx 提前关闭连接。proxy_buffering off
也是一个好习惯,确保后端一有数据就立即转发给客户端。
4. CircleCI 自动化部署
最后,我们需要一个 CI/CD 流水线来自动化构建和部署所有组件。
# .circleci/config.yml
version: 2.1
orbs:
aws-cli: circleci/aws-[email protected]
aws-lambda: circleci/aws-[email protected]
executors:
go-executor:
docker:
- image: cimg/go:1.20
jobs:
build-and-deploy-lambda:
executor: go-executor
steps:
- checkout
- run:
name: Build Go Lambda binary
command: |
cd lambda/worker
GOOS=linux GOARCH=amd64 go build -o main main.go
zip deployment.zip main
- aws-lambda/deploy:
function-name: my-async-worker-lambda
zip-file: lambda/worker/deployment.zip
aws-region: ${AWS_REGION}
build-and-deploy-gateway:
executor: go-executor
steps:
- checkout
- aws-cli/setup
- run:
name: Build Go Bridge and Nginx Docker image
command: |
# 在这里添加构建Docker镜像的逻辑
# 例如:
# cd gateway
# GOOS=linux GOARCH=amd64 go build -o bridge/bridge_server bridge/main.go
# docker build -t my-gateway-image:${CIRCLE_SHA1} .
# aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${AWS_ECR_REGISTRY}
# docker push ${AWS_ECR_REGISTRY}/my-gateway-image:${CIRCLE_SHA1}
echo "Building and pushing gateway image..."
- run:
name: Deploy to EC2/ECS
command: |
# 在这里添加部署到ECS或在EC2上更新服务的脚本
# 例如,使用aws ecs update-service命令
echo "Deploying gateway service..."
workflows:
build-and-deploy-workflow:
jobs:
- build-and-deploy-lambda:
context: aws-creds # 在CircleCI中配置名为aws-creds的上下文,包含AWS凭证
- build-and-deploy-gateway:
context: aws-creds
requires:
- build-and-deploy-lambda
这个 CircleCI 配置定义了两个并行的 Job。build-and-deploy-lambda
负责编译 Go Lambda 代码,打包成 zip,并使用 aws-lambda
orb 进行部署。build-and-deploy-gateway
则负责构建包含 Nginx 和 Go Bridge 服务的 Docker 镜像,并将其推送到 ECR,最后触发 ECS 或 EC2 上的服务更新。在真实项目中,这部分会更加复杂,可能涉及基础设施即代码(Terraform/CloudFormation)的更新。
方案的局限性与未来展望
通过这套混合架构,我们成功地将长轮询的连接维持成本从昂贵的 Lambda 执行时间转移到了低成本的 Nginx 实例上。Lambda 现在只在需要时执行,极大地降低了运营成本。
然而,这个方案并非没有缺点。Go Bridge Service 成了有状态的单点。如果部署它的实例崩溃,所有正在等待的客户端连接都会丢失。要实现高可用,需要部署多个 Bridge 实例,并在它们之间同步或共享挂起请求的状态,例如通过 Redis Pub/Sub。当一个 Bridge 实例收到 ActiveMQ 的消息但发现对应的请求在另一个实例上时,它需要通过 Redis 将消息转发过去。
另一个可优化的方向是 Nginx 本身。使用 OpenResty (Nginx + Lua) 或者 Nginx Unit,有可能直接在 Nginx 的工作进程中实现与 ActiveMQ 的 STOMP 协议交互,从而省去独立的 Go Bridge Service,进一步简化架构。但这会增加 Nginx 配置的复杂性,并引入对 Lua 脚本或特定 Nginx 模块的依赖。技术选型总是在不同维度间的权衡,当前方案在清晰度和快速实现上具有优势,而未来的迭代可以向着更高的集成度和容错性演进。