使用 XState 与 ScyllaDB 构建一个状态驱动的服务发现节点


一个看似健壮的服务发现系统,其最脆弱的环节往往在于对服务实例生命周期的模糊处理。多数系统将服务状态简化为二进制的“在线”或“离线”,这种模型在面对服务启动预热、优雅停机、健康检查抖动(flapping)等真实场景时,显得力不从心。一个实例在短暂的网络分区后,是被立即剔除还是进入一个短暂的“疑似离线”观察期?一个正在执行draining操作的实例,是否应该继续接收新流量?这些问题无法通过一个简单的布尔值来回答。

问题的核心在于,服务实例的生命周期是一个复杂的状态过程,而非简单的开关。如果我们的服务发现机制能够将这个过程模型化,那么系统的确定性和韧性将得到质的提升。这引出了我们的核心构想:将每一个服务实例视为一个独立的、可管理的有限状态机(Finite State Machine)。

为此,我们将放弃传统的基于心跳和TTL的无状态注册模型,转而构建一个状态驱动的服务发现节点。该节点的核心将由两个组件驱动:

  1. XState: 一个强大的状态机与状态图库,用于精确、声明式地定义和执行每个服务实例的生命周期模型。它能将复杂的异步流程转化为可预测、可测试、甚至可视化的状态图。
  2. ScyllaDB: 一个与Cassandra兼容的高性能NoSQL数据库。我们选择它作为状态持久化和注册信息查询的后端,因为它提供了应对高并发读写所必需的低延迟和高可用性,这对于服务发现这种基础设施的核心组件至关重要。

以下是我们将要实现的服务实例生命周期状态图,它构成了整个系统的逻辑基石。

stateDiagram-v2
    direction LR
    [*] --> registering: REGISTER

    state registering {
        [*] --> pending_persistence: on entry
        pending_persistence --> active: PERSISTENCE_SUCCESS
        pending_persistence --> persistence_failed: PERSISTENCE_FAILURE
        persistence_failed --> pending_persistence: RETRY_PERSIST
    }

    state active {
        direction LR
        [*] --> healthy: HEALTH_CHECK_PASSED
        healthy --> unhealthy: HEALTH_CHECK_FAILED
        unhealthy --> healthy: HEALTH_CHECK_PASSED
    }

    registering --> active: Initial health check OK

    active --> draining: GRACEFUL_SHUTDOWN
    draining --> deregistered: DRAIN_COMPLETE
    active --> deregistered: FORCE_DEREGISTER
    registering --> deregistered: REGISTRATION_FAILED

    deregistered --> [*]

数据模型与持久化层 (ScyllaDB)

在深入代码之前,必须先定义数据如何在ScyllaDB中存储。服务发现的查询模式通常是“查找某个服务名称下的所有健康实例”。因此,service_name是一个理想的分区键。instance_id作为集群键,确保同一服务下的实例唯一。

Keyspace 和 Table 定义 (CQL):

-- production.cql

CREATE KEYSPACE IF NOT EXISTS service_registry
WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3 };

USE service_registry;

-- 存储服务实例的核心信息和状态
CREATE TABLE IF NOT EXISTS instances (
    service_name text,
    instance_id uuid,
    node_address text, -- IP:Port
    payload map<text, text>, -- 自定义元数据, e.g., region, version
    current_state text, -- 'registering', 'healthy', 'unhealthy', 'draining'
    last_heartbeat timestamp,
    registration_time timestamp,
    PRIMARY KEY (service_name, instance_id)
) WITH CLUSTERING ORDER BY (instance_id ASC);

-- 建立一个物化视图或二级索引可以加速按状态查询,但需注意性能权衡
-- 这里我们暂时通过应用层逻辑处理,保持写入路径的简洁高效

数据库连接与配置:

我们将使用cassandra-driver来连接ScyllaDB。在真实项目中,配置必须是健壮的,包括连接池、重试策略和负载均衡策略。

// src/database/scylla.ts

import { Client, auth, policies } from 'cassandra-driver';
import { Logger } from '../utils/logger'; // 一个简单的日志封装

const CONTACT_POINTS = process.env.SCYLLA_CONTACT_POINTS?.split(',') || ['127.0.0.1:9042'];
const LOCAL_DATACENTER = process.env.SCYLLA_DATACENTER || 'datacenter1';
const KEYSPACE = 'service_registry';

export class ScyllaDBClient {
  private static instance: Client;

  public static getInstance(): Client {
    if (!ScyllaDBClient.instance) {
      Logger.info(`Initializing ScyllaDB client for datacenter: ${LOCAL_DATACENTER}`);
      ScyllaDBClient.instance = new Client({
        contactPoints: CONTACT_POINTS,
        localDataCenter: LOCAL_DATACENTER,
        keyspace: KEYSPACE,
        authProvider: new auth.PlainTextAuthProvider(
          process.env.SCYLLA_USER || 'cassandra',
          process.env.SCYLLA_PASS || 'cassandra'
        ),
        policies: {
          loadBalancing: new policies.loadBalancing.TokenAwarePolicy(
            new policies.loadBalancing.DCAwareRoundRobinPolicy(LOCAL_DATACENTER)
          ),
          reconnection: new policies.reconnection.ExponentialReconnectionPolicy(100, 10 * 1000, false),
        },
        queryOptions: {
          consistency: 1, // LOCAL_ONE
          prepare: true,
        },
      });

      ScyllaDBClient.instance.on('log', (level, className, message) => {
        if (level === 'error' || level === 'warning') {
          Logger.warn(`[ScyllaDB Driver] ${level}: ${className} - ${message}`);
        }
      });
    }
    return ScyllaDBClient.instance;
  }

  public static async connect() {
    try {
      await this.getInstance().connect();
      Logger.info('Successfully connected to ScyllaDB.');
    } catch (error) {
      Logger.error('Failed to connect to ScyllaDB.', error);
      process.exit(1); // 连接数据库是关键路径,失败则退出
    }
  }

  public static async shutdown() {
    Logger.info('Shutting down ScyllaDB connection...');
    await this.getInstance().shutdown();
  }
}

核心逻辑:服务实例状态机 (XState)

这是系统的灵魂所在。我们将创建一个状态机工厂函数,为每个新注册的服务实例生成一个状态机定义。

// src/machines/instance.machine.ts

import { createMachine, assign } from 'xstate';
import { ScyllaDBClient } from '../database/scylla';
import { v4 as uuidv4 } from 'uuid';
import { Logger } from '../utils/logger';

// 上下文接口,存储状态机内部数据
export interface InstanceContext {
  serviceName: string;
  instanceId: string;
  nodeAddress: string;
  payload: Record<string, string>;
  lastError: string | null;
  healthCheckRetries: number;
}

// 状态机可以接收的事件
export type InstanceEvent =
  | { type: 'REGISTER'; data: { serviceName: string; nodeAddress: string; payload: Record<string, string> } }
  | { type: 'PERSISTENCE_SUCCESS' }
  | { type: 'PERSISTENCE_FAILURE'; error: string }
  | { type: 'RETRY_PERSIST' }
  | { type: 'HEALTH_CHECK_PASSED' }
  | { type: 'HEALTH_CHECK_FAILED'; error: string }
  | { type: 'GRACEFUL_SHUTDOWN' }
  | { type: 'DRAIN_COMPLETE' }
  | { type: 'FORCE_DEREGISTER' }
  | { type: 'REGISTRATION_FAILED' };


const MAX_HEALTH_CHECK_RETRIES = 3;

// 使用工厂函数创建状态机,以便每次都能获得一个新的、干净的实例
export const createInstanceMachine = (instanceId: string = uuidv4()) => {
  return createMachine<InstanceContext, InstanceEvent>({
    id: `instance-${instanceId}`,
    initial: 'unregistered',
    context: {
      instanceId,
      serviceName: '',
      nodeAddress: '',
      payload: {},
      lastError: null,
      healthCheckRetries: 0,
    },
    states: {
      unregistered: {
        on: {
          REGISTER: {
            target: 'registering',
            actions: assign({
              serviceName: (_, event) => event.data.serviceName,
              nodeAddress: (_, event) => event.data.nodeAddress,
              payload: (_, event) => event.data.payload,
            }),
          },
        },
      },
      registering: {
        invoke: {
          id: 'persistToDb',
          src: 'persistService', // 'persistService' 是一个将在外部实现的服务
          onDone: { target: 'active', event: 'PERSISTENCE_SUCCESS' },
          onError: {
            target: 'persistence_failed',
            actions: assign({ lastError: (_, event) => event.data }),
          },
        },
        on: {
            REGISTRATION_FAILED: 'deregistered'
        }
      },
      persistence_failed: {
        after: {
          // 简单的指数退避重试
          5000: { target: 'registering', actions: 'logRetry' },
        },
        on: {
            RETRY_PERSIST: 'registering'
        }
      },
      active: {
        initial: 'unknown',
        entry: 'startHealthChecker', // 进入 active 状态后开始健康检查
        exit: 'stopHealthChecker',   // 离开 active 状态后停止
        states: {
          unknown: {
            // 初始状态,立即触发一次检查
            always: 'healthy' // 简化模型,假设注册成功后立即健康
          },
          healthy: {
            entry: ['resetRetries', 'updateStateInDb'],
            on: {
              HEALTH_CHECK_FAILED: {
                target: 'unhealthy',
                actions: assign({ 
                    lastError: (_, event) => event.error,
                    healthCheckRetries: (ctx) => ctx.healthCheckRetries + 1,
                }),
              },
            },
          },
          unhealthy: {
            entry: 'updateStateInDb',
            always: {
                // 如果重试次数超过阈值,则认为实例彻底失败,强制注销
                target: '#deregistered-state', // 使用ID跳转到顶层状态
                cond: (ctx) => ctx.healthCheckRetries >= MAX_HEALTH_CHECK_RETRIES
            },
            on: {
              HEALTH_CHECK_PASSED: 'healthy',
              HEALTH_CHECK_FAILED: {
                  // 在 unhealthy 状态下,继续增加重试计数
                  target: 'unhealthy',
                  actions: assign({
                      lastError: (_, event) => event.error,
                      healthCheckRetries: (ctx) => ctx.healthCheckRetries + 1,
                  }),
                  internal: false // 确保 entry/exit 动作重新执行
              }
            },
          },
        },
        on: {
          GRACEFUL_SHUTDOWN: 'draining',
          FORCE_DEREGISTER: 'deregistered',
        },
      },
      draining: {
        entry: 'updateStateInDb',
        invoke: {
          id: 'drainProcess',
          src: 'executeDrain',
          onDone: { target: 'deregistered', event: 'DRAIN_COMPLETE' },
          onError: {
            // drain 失败也强制注销,但会记录错误
            target: 'deregistered',
            actions: assign({ lastError: (_, event) => event.data }),
          },
        },
      },
      deregistered: {
        id: 'deregistered-state',
        type: 'final',
        entry: 'cleanupResource',
      },
    },
  });
};

这里的 src 属性如 persistService, startHealthChecker 引用的是需要外部实现并注入到状态机解释器中的异步服务(Promise或回调)。

服务发现节点控制器 (The Agent)

Agent是整个系统的调度中心。它维护一个所有活动服务实例状态机的映射,并提供HTTP接口与外部交互。我们将使用轻量级的Web框架Fastify。

// src/agent.ts

import { interpret, StateMachine } from 'xstate';
import { createInstanceMachine, InstanceContext, InstanceEvent } from './machines/instance.machine';
import { ScyllaDBClient } from './database/scylla';
import Fastify from 'fastify';
import { Logger } from './utils/logger';

type InstanceInterpreter = ReturnType<typeof interpret<StateMachine<InstanceContext, any, InstanceEvent>>>;

export class DiscoveryAgent {
  // Map<instanceId, XState Interpreter>
  private instances: Map<string, InstanceInterpreter> = new Map();
  private scyllaClient = ScyllaDBClient.getInstance();

  private machineOptions = {
    services: {
      persistService: (context: InstanceContext) => this.persistToDb(context, 'registering'),
      updateStateInDb: (context: InstanceContext, event: any) => {
        // 'any' is not ideal, but XState event typing can be tricky here.
        // We determine the state from the machine's current value.
        const stateToPersist = context.machine?.getSnapshot().value.toString() || 'unknown';
        return this.persistToDb(context, stateToPersist);
      },
      startHealthChecker: (context: InstanceContext) => {
        Logger.info(`[${context.instanceId}] Starting health checker.`);
        // 在真实项目中,这里会启动一个定时器(setInterval)
        // 定期向 instance.nodeAddress 发送健康检查请求
        // 并根据结果发送 HEALTH_CHECK_PASSED 或 HEALTH_CHECK_FAILED 事件
        // 此处用 setTimeout 模拟一次成功的检查
        const interpreter = this.instances.get(context.instanceId);
        const intervalId = setInterval(() => {
            // Mock health check logic
            const isHealthy = Math.random() > 0.1; // 10% chance to fail
            if (interpreter?.getSnapshot().matches('active')) {
                if(isHealthy) {
                    interpreter.send({ type: 'HEALTH_CHECK_PASSED' });
                } else {
                    interpreter.send({ type: 'HEALTH_CHECK_FAILED', error: 'Simulated failure' });
                }
            }
        }, 5000);
        return () => {
          Logger.info(`[${context.instanceId}] Stopping health checker.`);
          clearInterval(intervalId);
        };
      },
      executeDrain: (context: InstanceContext) => {
        Logger.info(`[${context.instanceId}] Executing drain for 10s.`);
        // 模拟一个需要10秒的优雅停机过程
        return new Promise(resolve => setTimeout(resolve, 10000));
      },
      cleanupResource: (context: InstanceContext) => this.removeFromDb(context),
    },
    actions: {
      logRetry: (context: InstanceContext) => {
        Logger.warn(`[${context.instanceId}] Retrying persistence...`);
      },
      resetRetries: assign({ healthCheckRetries: 0 }),
    },
  };

  private async persistToDb(context: InstanceContext, state: string) {
    const query = `
      INSERT INTO instances (service_name, instance_id, node_address, payload, current_state, last_heartbeat, registration_time)
      VALUES (?, ?, ?, ?, ?, toTimestamp(now()), toTimestamp(now()))
      IF NOT EXISTS;
    `; // 使用 IF NOT EXISTS 处理首次注册
    
    const updateQuery = `
      UPDATE instances SET current_state = ?, last_heartbeat = toTimestamp(now())
      WHERE service_name = ? AND instance_id = ?;
    `;

    try {
      if (state === 'registering') {
          await this.scyllaClient.execute(query, [
            context.serviceName, context.instanceId, context.nodeAddress, context.payload, 'healthy'
          ], { prepare: true });
      } else {
          await this.scyllaClient.execute(updateQuery, [
            state, context.serviceName, context.instanceId
          ], { prepare: true });
      }
      Logger.info(`[${context.instanceId}] Persisted state '${state}' to ScyllaDB.`);
    } catch (err) {
      Logger.error(`[${context.instanceId}] Failed to persist state to ScyllaDB`, err);
      throw err; // 抛出异常让 XState 的 onError 捕获
    }
  }

  private async removeFromDb(context: InstanceContext) {
      const query = `DELETE FROM instances WHERE service_name = ? AND instance_id = ?;`;
      try {
          await this.scyllaClient.execute(query, [context.serviceName, context.instanceId], { prepare: true });
          Logger.info(`[${context.instanceId}] Removed instance from ScyllaDB.`);
          this.instances.delete(context.instanceId); // 从内存中彻底移除
      } catch (err) {
          Logger.error(`[${context.instanceId}] Failed to remove from ScyllaDB`, err);
      }
  }

  public registerInstance(serviceName: string, nodeAddress: string, payload: Record<string, string>) {
    const machine = createInstanceMachine().withConfig(this.machineOptions);
    const interpreter = interpret(machine).onTransition((state) => {
      Logger.debug(`[${state.context.instanceId}] Transition to: ${JSON.stringify(state.value)}`);
    }).start();

    this.instances.set(interpreter.machine.context.instanceId, interpreter);

    interpreter.send({
      type: 'REGISTER',
      data: { serviceName, nodeAddress, payload },
    });

    return interpreter.machine.context.instanceId;
  }
  
  public deregisterInstance(instanceId: string, force: boolean = false) {
    const interpreter = this.instances.get(instanceId);
    if (interpreter) {
      interpreter.send({ type: force ? 'FORCE_DEREGISTER' : 'GRACEFUL_SHUTDOWN' });
      return true;
    }
    return false;
  }

  public async discover(serviceName: string): Promise<any[]> {
    const query = `SELECT instance_id, node_address, payload FROM instances WHERE service_name = ? AND current_state = 'healthy' ALLOW FILTERING;`;
    // 警告:生产环境中对非主键列使用 ALLOW FILTERING 性能很差。
    // 更优方案是使用物化视图或二级索引,或在应用层维护一个健康实例的缓存。
    // 这里为了演示简洁性而使用。
    const result = await this.scyllaClient.execute(query, [serviceName], { prepare: true });
    return result.rows.map(row => ({
        instanceId: row.instance_id,
        nodeAddress: row.node_address,
        payload: row.payload
    }));
  }
}

对外暴露的API (Fastify)

最后,我们用Fastify将Agent的功能包装成HTTP接口。

// src/server.ts

import Fastify, { FastifyInstance } from 'fastify';
import { DiscoveryAgent } from './agent';
import { ScyllaDBClient } from './database/scylla';
import { Logger } from './utils/logger';

const server: FastifyInstance = Fastify({});
const agent = new DiscoveryAgent();

// 注册接口
server.post('/register', async (request, reply) => {
  const { serviceName, nodeAddress, payload } = request.body as any;
  if (!serviceName || !nodeAddress) {
    return reply.status(400).send({ error: 'serviceName and nodeAddress are required' });
  }
  const instanceId = agent.registerInstance(serviceName, nodeAddress, payload || {});
  reply.status(202).send({ instanceId });
});

// 注销接口
server.post('/deregister/:instanceId', async (request, reply) => {
  const { instanceId } = request.params as any;
  const { force } = request.query as any;
  const success = agent.deregisterInstance(instanceId, force === 'true');
  if (success) {
    reply.status(202).send({ message: 'Deregistration process started.' });
  } else {
    reply.status(404).send({ error: 'Instance not found.' });
  }
});

// 服务发现接口
server.get('/discover/:serviceName', async (request, reply) => {
    const { serviceName } = request.params as any;
    try {
        const instances = await agent.discover(serviceName);
        reply.send({ instances });
    } catch (error) {
        Logger.error(`Discovery for ${serviceName} failed.`, error);
        reply.status(500).send({ error: 'Failed to query services' });
    }
});

const start = async () => {
  try {
    await ScyllaDBClient.connect();
    await server.listen({ port: 3000, host: '0.0.0.0' });
    Logger.info('Discovery service started on port 3000');
  } catch (err) {
    server.log.error(err);
    process.exit(1);
  }
};

start();

方案的局限性与未来展望

这个实现展示了状态驱动服务发现的核心思想,但它并非一个完整的生产级系统。当前的架构是一个单点Agent,这本身就是一个巨大的风险。若Agent节点宕机,所有正在进行中的状态转换逻辑(如健康检查、优雅停机计时器)都会丢失,尽管已持久化的状态保留在ScyllaDB中。

一个自然的演进方向是构建一个Agent集群。这会引入新的复杂性,例如:如何确保对同一个instanceId的状态机只在一个Agent上运行?这需要一个分布式锁或领导者选举机制,可以利用ScyllaDB的轻量级事务(LWT)来实现。

此外,健康检查逻辑目前是简单的轮询。更高级的实现可以支持多种检查类型(HTTP, TCP, gRPC),并允许服务实例主动推送心跳(push模型),以减轻Agent的压力。

最后,查询性能是一个必须正视的问题。在ScyllaDB中直接对current_state进行过滤(ALLOW FILTERING)在数据集增长后是不可接受的。生产环境需要通过建立专门用于查询的物化视图(CREATE MATERIALIZED VIEW ... AS SELECT ... WHERE current_state = 'healthy')或在Agent内存中维护一个健康的实例缓存来解决,但这又会引入缓存一致性的挑战。这个状态驱动模型,虽然增加了逻辑复杂性,但为构建一个真正理解服务生命周期的、更具韧性的分布式系统奠定了坚实的基础。


  目录