构建驱动 containerd 的 Saga 协调器并使用 Vitest 实施补偿逻辑的集成测试


我们面临一个棘手的任务:一个多阶段的数据处理流程,每个阶段都必须在隔离的环境中执行,且整个流程需要具备事务性。如果任何一个阶段失败,所有已完成的阶段都必须回滚。典型的微服务编排通常依赖 Kubernetes Job 或 Step Functions 之类的重量级方案,但我们的场景要求更轻量、更贴近底层的控制,并且对启动延迟有一定要求。最终,我们决定直接基于 containerd 构建一个无状态的 Saga 模式协调器。

这个协调器的核心职责是按顺序执行定义好的任务(每个任务是一个容器),并在某个任务失败时,反向执行对应的补偿任务(也是容器)。最大的挑战并非实现正向流程,而在于如何可靠地测试补偿逻辑。这直接关系到系统的最终一致性。我们选择了 Vitest,一个以速度和现代化著称的测试框架,来为这个基于 Node.js 的协调器编写一套健壮的韧性集成测试。

技术选型决策

  1. Saga 模式: 放弃两阶段提交(2PC)的强一致性方案,是因为我们的业务场景允许短暂的不一致。Saga 模式通过补偿操作实现最终一致性,更适合长流程、跨服务的分布式事务。
  2. containerd 而非 Docker: 我们需要的是一个纯粹的容器运行时,而不是一个包含网络、卷管理等功能的完整平台。直接与 containerd 的 API (通过其 gRPC 接口或 CLI 工具如 nerdctl) 交互,能给我们带来最小的开销和最大的灵活性。
  3. Node.js: 协调器本身是 I/O 密集型任务——启动容器、等待容器结束、记录状态。Node.js 的事件循环和 async/await 模型非常适合处理这类编排逻辑。
  4. Vitest: 虽然常用于前端项目,但 Vitest 在 Node.js 后端测试中同样出色。它的 ESM-first 支持、内置的 Mocking 和 Spying 功能(兼容 Jest API),以及极快的执行速度,使其成为测试复杂异步逻辑(如我们的协调器)的理想选择。

Saga 协调器的核心实现

首先,我们定义 Saga 的数据结构。一个 Saga 由多个步骤(steps)组成,每个步骤包含一个正向操作(action)和一个补偿操作(compensation)。

// src/types.ts

/**
 * 定义一个容器化任务的规格
 */
export interface ContainerTaskSpec {
  image: string;      // 容器镜像
  command: string[];  // 执行的命令
  env?: string[];     // 环境变量, e.g., ["KEY=VALUE"]
  timeoutMs?: number; // 任务超时时间
}

/**
 * Saga 流程中的一个步骤
 */
export interface SagaStep {
  name: string;
  action: ContainerTaskSpec;
  compensation: ContainerTaskSpec;
}

/**
 * 完整的 Saga 定义
 */
export interface SagaDefinition {
  id: string;
  steps: SagaStep[];
}

/**
 * 记录已成功执行的步骤,用于失败时进行补偿
 */
export interface SagaExecutionState {
  sagaId: string;
  currentStepIndex: number;
  completedSteps: SagaStep[];
  status: 'RUNNING' | 'COMPLETED' | 'COMPENSATING' | 'FAILED';
}

接下来是与 containerd 交互的模块。为了简化,我们在这里通过 nerdctl CLI 进行交互。在生产环境中,这可能会被替换为直接使用 containerd 的 gRPC API 的客户端库,以减少 exec 开销和增强错误处理。

// src/containerd-runner.ts
import { exec } from 'child_process';
import { promisify } from 'util';
import type { ContainerTaskSpec } from './types';

const execAsync = promisify(exec);

// 日志记录器,实际项目中应使用 pino 或 winston
const logger = {
  info: (message: string) => console.log(`[INFO] ${message}`),
  error: (message: string, error?: any) => console.error(`[ERROR] ${message}`, error || ''),
};

/**
 * 负责与 containerd 交互,执行单个容器任务
 */
export class ContainerdRunner {
  private namespace: string;

  constructor(namespace: string = 'default') {
    this.namespace = namespace;
  }

  /**
   * 运行一个一次性的容器任务,并等待其完成
   * @param taskId - 一个唯一的ID,用于容器命名
   * @param spec - 容器任务的规格
   * @returns Promise<void> - 成功完成时 resolve,失败时 reject
   */
  async runTask(taskId: string, spec: ContainerTaskSpec): Promise<void> {
    const containerName = `saga-task-${taskId}-${Date.now()}`;
    const envFlags = spec.env?.map(e => `-e ${e}`).join(' ') || '';
    
    // nerdctl run --rm 会在容器退出后自动删除它
    const command = `nerdctl -n ${this.namespace} run --rm --name ${containerName} ${envFlags} ${spec.image} ${spec.command.join(' ')}`;

    logger.info(`Executing command: ${command}`);

    try {
      // 设定超时。Promise.race 会在任务或超时任意一个先完成时继续
      const taskPromise = execAsync(command, { timeout: spec.timeoutMs || 60000 });
      
      const { stdout, stderr } = await taskPromise;

      if (stdout) logger.info(`[${containerName}] STDOUT: ${stdout}`);
      if (stderr) logger.info(`[${containerName}] STDERR: ${stderr}`);
      
      logger.info(`Task ${containerName} completed successfully.`);

    } catch (error: any) {
      // execAsync 在命令返回非0退出码时会抛出错误
      logger.error(`Task ${containerName} failed. Exit code: ${error.code}.`, error.stderr);
      // 抛出结构化的错误,方便上层处理
      throw new Error(`Container task ${containerName} failed with exit code ${error.code}`);
    }
  }
}

这里的 runTask 方法是关键。它封装了执行 nerdctl run 的逻辑,并包含了超时处理。特别要注意,exec 在子进程返回非零退出码时会抛出异常,这正是我们用来判断任务成功或失败的机制。

最后是协调器 SagaExecutor 的实现。

// src/saga-executor.ts
import { ContainerdRunner } from './containerd-runner';
import type { SagaDefinition, SagaStep, SagaExecutionState } from './types';

// 日志记录器
const logger = {
  info: (message: string) => console.log(`[INFO] ${message}`),
  warn: (message: string) => console.warn(`[WARN] ${message}`),
  error: (message: string, error?: any) => console.error(`[ERROR] ${message}`, error || ''),
};

export class SagaExecutor {
  private runner: ContainerdRunner;

  constructor(runner: ContainerdRunner) {
    this.runner = runner;
  }

  public async execute(saga: SagaDefinition): Promise<boolean> {
    const state: SagaExecutionState = {
      sagaId: saga.id,
      currentStepIndex: 0,
      completedSteps: [],
      status: 'RUNNING',
    };

    logger.info(`Starting saga: ${saga.id}`);

    for (const step of saga.steps) {
      try {
        logger.info(`Executing action for step: ${step.name}`);
        await this.runner.runTask(`${saga.id}-step-${state.currentStepIndex}-action`, step.action);
        
        // 任务成功,记录到状态中
        state.completedSteps.push(step);
        state.currentStepIndex++;

      } catch (error) {
        logger.error(`Action for step ${step.name} failed. Initiating compensation.`, error);
        state.status = 'COMPENSATING';
        await this.compensate(state);
        state.status = 'FAILED';
        return false; // Saga 执行失败
      }
    }

    state.status = 'COMPLETED';
    logger.info(`Saga ${saga.id} completed successfully.`);
    return true; // Saga 执行成功
  }

  private async compensate(state: SagaExecutionState): Promise<void> {
    logger.warn(`Starting compensation for saga: ${state.sagaId}.`);
    
    // 从最后成功的步骤开始,反向执行补偿操作
    const stepsToCompensate = [...state.completedSteps].reverse();

    for (const step of stepsToCompensate) {
      try {
        logger.warn(`Executing compensation for step: ${step.name}`);
        await this.runner.runTask(`${state.sagaId}-step-${step.name}-compensation`, step.compensation);
      } catch (compensationError) {
        // 这是一个严重的问题:补偿操作失败了。
        // 在真实项目中,这里需要有重试机制、告警,甚至人工介入流程。
        logger.error(`FATAL: Compensation for step ${step.name} failed. Manual intervention required.`, compensationError);
        // 停止进一步的补偿,因为状态已经不可预测
        throw new Error(`Compensation failed for step ${step.name}`);
      }
    }
    logger.warn(`Compensation completed for saga: ${state.sagaId}.`);
  }
}

SagaExecutorexecute 方法是核心。它遍历所有步骤,执行 action。如果任何一个 action 失败(即 runTask 抛出异常),它会捕获异常,然后调用 compensate 方法。compensate 方法则反向遍历所有已经成功完成的步骤,并执行它们的 compensation 操作。

graph TD
    subgraph Saga Flow
        A[Start] --> B(Step 1: Action);
        B -- Success --> C(Step 2: Action);
        C -- Success --> D(Step 3: Action);
        D -- Success --> E[End: Success];
        C -- Failure --> F{Compensate};
        F --> G(Step 2: Compensation);
        G --> H(Step 1: Compensation);
        H --> I[End: Failed];
    end

使用 Vitest 进行韧性集成测试

现在,我们需要验证 SagaExecutor 的补偿逻辑是否如预期工作。我们不能在测试中真的去拉取镜像和启动 containerd 容器,那样太慢且不稳定。这里的关键是模拟 ContainerdRunner 的行为。

我们将创建 ContainerdRunner 的一个 mock 实现,让它在特定条件下成功,在其他条件下失败。这是 Vitest 的 vi.spyOnmockImplementation 的用武之地。

这是我们的测试文件结构:

.
├── src
│   ├── containerd-runner.ts
│   ├── saga-executor.ts
│   └── types.ts
└── test
    └── saga-executor.test.ts

下面是完整的测试代码:

// test/saga-executor.test.ts
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { SagaExecutor } from '../src/saga-executor';
import { ContainerdRunner } from '../src/containerd-runner';
import type { SagaDefinition } from '../src/types';

// 我们将模拟整个 ContainerdRunner 类
vi.mock('../src/containerd-runner');

describe('SagaExecutor', () => {
  let mockRunner: ContainerdRunner;
  let executor: SagaExecutor;

  // 定义一个用于测试的Saga
  const testSaga: SagaDefinition = {
    id: 'test-workflow',
    steps: [
      {
        name: 'create-resource',
        action: { image: 'alpine', command: ['echo', 'creating'] },
        compensation: { image: 'alpine', command: ['echo', 'deleting resource'] },
      },
      {
        name: 'process-data',
        action: { image: 'alpine', command: ['echo', 'processing'] },
        compensation: { image: 'alpine', command: ['echo', 'reverting processing'] },
      },
      {
        name: 'finalize',
        action: { image: 'alpine', command: ['echo', 'finalizing'] },
        compensation: { image: 'alpine', command: ['echo', 'reverting finalization'] },
      },
    ],
  };

  beforeEach(() => {
    // 在每个测试用例开始前,重置所有的 mock
    vi.clearAllMocks();
    
    // 创建 ContainerdRunner 的一个 mock 实例
    // 这里的构造函数参数无所谓,因为所有方法都会被 mock
    mockRunner = new ContainerdRunner(); 
    executor = new SagaExecutor(mockRunner);
  });

  it('should execute all steps successfully if all actions succeed', async () => {
    // 安排 (Arrange): Mock runTask 总是成功
    const runTaskSpy = vi.spyOn(mockRunner, 'runTask').mockResolvedValue(undefined);

    // 行动 (Act): 执行 Saga
    const result = await executor.execute(testSaga);

    // 断言 (Assert):
    // 1. Saga 最终结果应为成功
    expect(result).toBe(true);
    
    // 2. runTask 应该被调用3次 (对应3个 action)
    expect(runTaskSpy).toHaveBeenCalledTimes(3);

    // 3. 验证每次调用的都是 action 任务
    expect(runTaskSpy).toHaveBeenCalledWith(expect.stringContaining('action'), testSaga.steps[0].action);
    expect(runTaskSpy).toHaveBeenCalledWith(expect.stringContaining('action'), testSaga.steps[1].action);
    expect(runTaskSpy).toHaveBeenCalledWith(expect.stringContaining('action'), testSaga.steps[2].action);
    
    // 4. 确保没有调用任何 compensation 任务
    expect(runTaskSpy).not.toHaveBeenCalledWith(expect.stringContaining('compensation'), expect.any(Object));
  });

  it('should trigger compensation for completed steps if an action fails', async () => {
    // 安排 (Arrange):
    // 让第二个 action 失败,其他都成功
    const runTaskSpy = vi.spyOn(mockRunner, 'runTask');
    
    // 第一个 action (create-resource) 成功
    runTaskSpy
      .withArgs(expect.stringContaining('step-0-action'), testSaga.steps[0].action)
      .mockResolvedValue(undefined);
      
    // 第二个 action (process-data) 失败
    runTaskSpy
      .withArgs(expect.stringContaining('step-1-action'), testSaga.steps[1].action)
      .mockRejectedValue(new Error('Container failed'));

    // 所有补偿操作都成功
    runTaskSpy
      .withArgs(expect.stringContaining('compensation'), expect.any(Object))
      .mockResolvedValue(undefined);


    // 行动 (Act): 执行 Saga
    const result = await executor.execute(testSaga);

    // 断言 (Assert):
    // 1. Saga 最终结果为失败
    expect(result).toBe(false);

    // 2. 总共调用了 3 次 runTask: 2个 action + 1个 compensation
    expect(runTaskSpy).toHaveBeenCalledTimes(3);

    // 3. 验证正向流程的调用
    // Step 1 action 被调用
    expect(runTaskSpy).toHaveBeenCalledWith(expect.stringContaining('step-0-action'), testSaga.steps[0].action);
    // Step 2 action 被调用(并失败)
    expect(runTaskSpy).toHaveBeenCalledWith(expect.stringContaining('step-1-action'), testSaga.steps[1].action);
    // Step 3 action 从未被调用
    expect(runTaskSpy).not.toHaveBeenCalledWith(expect.stringContaining('step-2-action'), testSaga.steps[2].action);

    // 4. 验证补偿流程的调用
    // 只有 Step 1 的补偿被调用
    expect(runTaskSpy).toHaveBeenCalledWith(expect.stringContaining('compensation'), testSaga.steps[0].compensation);
    // Step 2 和 Step 3 的补偿不应被调用(Step 2 本身失败,Step 3 未执行)
    expect(runTaskSpy).not.toHaveBeenCalledWith(expect.stringContaining('compensation'), testSaga.steps[1].compensation);
    expect(runTaskSpy).not.toHaveBeenCalledWith(expect.stringContaining('compensation'), testSaga.steps[2].compensation);
  });

  it('should stop and throw if a compensation action itself fails', async () => {
    // 这是一个关键的边缘情况测试,用于验证我们的“熔断”机制
    const runTaskSpy = vi.spyOn(mockRunner, 'runTask');

    // Step 1 action 成功
    runTaskSpy
      .withArgs(expect.stringContaining('step-0-action'), testSaga.steps[0].action)
      .mockResolvedValue(undefined);

    // Step 2 action 失败,触发补偿
    runTaskSpy
      .withArgs(expect.stringContaining('step-1-action'), testSaga.steps[1].action)
      .mockRejectedValue(new Error('Container failed'));

    // Step 1 的补偿操作也失败了!
    runTaskSpy
      .withArgs(expect.stringContaining('compensation'), testSaga.steps[0].compensation)
      .mockRejectedValue(new Error('Compensation container failed'));

    // 行动 & 断言:
    // 我们期望 execute 方法抛出一个未捕获的异常,因为补偿失败是致命的
    await expect(executor.execute(testSaga)).rejects.toThrow('Compensation failed for step create-resource');
    
    // 验证调用顺序:Step 1 action -> Step 2 action (fail) -> Step 1 compensation (fail)
    expect(runTaskSpy).toHaveBeenCalledTimes(3);
  });
});

这个测试套件覆盖了三个核心场景:

  1. Happy Path: 所有步骤成功,验证所有 action 被调用,没有 compensation 被调用。
  2. Failure and Rollback: 中间步骤失败,验证失败点之前的步骤的 compensation 被正确调用,而失败点之后的步骤完全不被触及。这是对 Saga 核心逻辑的直接测试。
  3. Compensation Failure: 补偿操作本身失败。这是一个灾难性场景,我们的代码设计是向上抛出异常,停止整个流程,并明确指出问题。测试验证了这种行为,确保系统不会默默地进入一个不一致的状态。

局限性与未来展望

这个实现是一个单节点的、无状态的协调器。它非常适合那些可以将协调器本身作为短暂进程启动的场景。然而,在生产环境中,它有几个明显的局限性:

  1. 状态持久化: 当前的 SagaExecutionState 完全在内存中。如果协调器进程在 Saga 执行期间崩溃,整个流程的状态就会丢失,无法恢复。一个真正的生产级协调器需要将状态持久化到像 etcd、Redis 或数据库这样的可靠存储中,以便在重启后可以恢复并继续执行或补偿。
  2. 高可用: 作为一个单点,协调器本身是故障点。可以通过运行多个协调器实例并使用分布式锁(如基于 Redis或 Zookeeper)来确保同一时间只有一个实例在处理某个特定的 Saga。
  3. containerd 交互: 通过 nerdctl CLI 存在性能开销和解析 stdout/stderr 的脆弱性。更健壮的方法是使用 Node.js 的 gRPC 库直接与 containerdcontainerd.sock 通信。
  4. 测试深度: 当前的集成测试依赖于 mock ContainerdRunner。下一步是通过 Testcontainers 或类似的库,在测试环境中启动一个真实的 containerd 实例,进行更高保真度的端到端测试,但这会显著增加测试的复杂性和执行时间。

尽管存在这些局限,这个模型展示了如何将底层容器技术与经典的分布式事务模式相结合,并利用现代测试框架来确保其核心逻辑的正确性和韧性。这是构建更复杂、更可靠的云原生系统的坚实一步。


  目录