团队扩张后,机器学习项目中的特征工程流程开始失控。最初,特征的定义和生成逻辑散落在各个Jupyter Notebook和独立的Python脚本里。数据科学家们各自为战,一个新特征的上线,往往意味着手动执行一段未经严格测试的SQL,更新一个共享的数据表,然后祈祷不要破坏下游的模型训练或在线预测。版本控制基本靠文件名后缀 _v2
, _final
, _fixed
,特征的血缘关系和可复现性成了一笔糊涂账。当我们需要回滚某个特征,或者想对比不同版本特征对模型性能的影响时,整个过程充满了猜测和风险。
核心痛点非常明确:我们缺乏一套将特征工程纳入软件工程最佳实践的机制。特征的定义本身(通常是SQL转换逻辑)就应该是代码,需要被版本化、评审和自动化部署。这个想法催生了一个内部平台的初步构想:一个基于GitOps理念的版本化特征存储系统。
整个系统的设计目标是:
- 特征定义即代码:所有特征生成逻辑(SQL脚本)必须在Git仓库中管理。
- 版本化与可复现:使用DVC(Data Version Control)来追踪SQL脚本与其生成的具体数据版本之间的关系。
- 自动化流程:合并到主分支的特征定义,应自动触发执行、验证,并注册到中心化的特征注册表。
- 灵活的在线服务:通过服务代理,能够根据请求头,将流量路由到不同版本的特征服务上,以支持A/B测试和灰度发布。
技术选型基于轻量、高效和云原生的原则。我们最终确定了如下技术栈:
- 控制平面API: 使用Koa (Node.js) 构建。它足够轻量,中间件模型非常适合构建简单的RESTful API,用于接收CI/CD流水线的指令,并操作后端的特征注册表。
- 特征注册与离线存储: PostgreSQL。一个稳定可靠的关系型数据库,足以存储特征元数据和离线特征数据。
- 版本控制核心: DVC。它与Git无缝集成,通过
dvc.yaml
定义数据处理阶段,完美契合我们“SQL脚本 -> 特征数据表”的流水线。 - 服务代理与流量路由: Envoy Proxy。其强大的动态配置和路由能力,是实现在线特征服务版本控制的关键。
架构概览
在深入代码之前,我们先看一下整个系统的架构和数据流。
graph TD subgraph Git Repository A[Data Scientist: git push] --> B{CI/CD Pipeline}; C[features/user_avg_spend.sql] D[dvc.yaml] end subgraph CI/CD Runner B -- on merge to main --> E[dvc repro]; E -- executes SQL --> F[(PostgreSQL Offline Store)]; E -- after success --> G[curl -X POST /register]; end subgraph Control & Storage Plane G --> H{Koa Control Plane API}; H -- writes metadata --> I[(PostgreSQL Feature Registry)]; F -- feature data --> I; end subgraph Serving Plane J[Client Request] -- x-feature-version: 1.1 --> K{Envoy Proxy}; K -- route based on header --> L[Feature Service v1.0]; K -- route based on header --> M[Feature Service v1.1]; L --> I; M --> I; end style F fill:#d3d3d3,stroke:#333,stroke-width:2px style I fill:#d3d3d3,stroke:#333,stroke-width:2px
工作流程如下:
- 数据科学家在Git仓库的
features/
目录下添加或修改一个SQL文件,定义新的特征。 - 同时,他们更新
dvc.yaml
来定义这个SQL文件如何生成一个数据表。 - 提交Pull Request,经过评审后合并到
main
分支。 - CI/CD流水线被触发,执行
dvc repro
命令。DVC会运行对应的SQL,将结果物化到PostgreSQL离线存储中。 -
dvc repro
成功后,流水线中的一个脚本会调用Koa控制平面的/register
接口,将新版本的特征元数据(名称、版本、数据表位置等)写入特征注册表。 - 在线服务中,Envoy Proxy拦截所有特征获取请求。它可以解析请求头中的特定版本标识,将请求精确路由到部署了对应版本特征逻辑的服务实例上。
数据库与项目结构
首先,我们需要一个PostgreSQL数据库来存储特征的元数据。这个表的结构很简单,但至关重要。
SQL Schema (registry.sql
)
CREATE TABLE IF NOT EXISTS feature_registry (
id SERIAL PRIMARY KEY,
feature_name VARCHAR(255) NOT NULL,
version VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING, ACTIVE, DEPRECATED
description TEXT,
value_type VARCHAR(50), -- e.g., NUMERIC, CATEGORICAL
source_sql_path VARCHAR(512), -- Path to the SQL file in Git
offline_table_name VARCHAR(255), -- The materialized table in the DB
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
UNIQUE(feature_name, version)
);
-- Simple function to update the `updated_at` timestamp on row update
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_feature_registry_updated_at
BEFORE UPDATE ON feature_registry
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
我们的项目目录结构如下,将特征定义、DVC配置和控制平面代码清晰地分离开。
feature-store/
├── .dvc/
├── features/
│ └── user_7day_avg_spend.sql
├── services/
│ └── control-plane/
│ ├── src/
│ │ ├── db.js
│ │ ├── routes.js
│ │ └── server.js
│ ├── package.json
│ └── .env
├── dvc.yaml
└── .gitignore
DVC与GitOps流水线
DVC是连接Git和数据的桥梁。dvc.yaml
文件定义了我们的数据处理阶段。
dvc.yaml
# dvc.yaml
stages:
generate_user_7day_avg_spend:
# Command to execute. It uses environment variables for DB connection.
# The script should be idempotent.
cmd: >-
psql -h $DB_HOST -U $DB_USER -d $DB_NAME
-c "CREATE TABLE IF NOT EXISTS user_7day_avg_spend AS
SELECT * FROM dblink('dbname=source_db', 'SELECT user_id, AVG(amount) as avg_spend_7d FROM orders WHERE order_date >= NOW() - interval ''7 day'' GROUP BY user_id')
AS t(user_id BIGINT, avg_spend_7d NUMERIC(10, 2));"
# This stage depends on the SQL file. If the file changes, DVC knows to re-run.
deps:
- features/user_7day_avg_spend.sql
# We don't track the output data directly with DVC,
# as it's in a managed DB. Instead, we use a placeholder "metric" file
# to signify completion. The real output is the DB table.
# A post-hook in the CI pipeline will handle registration.
outs:
# This is a dummy output to make the stage trackable.
- data/user_7day_avg_spend.flag
在真实的CI/CD管道(如GitHub Actions)中,工作流会是这样:
# .github/workflows/feature-ci.yaml (conceptual)
name: Feature Engineering CI
on:
push:
branches:
- main
paths:
- 'features/**.sql'
- 'dvc.yaml'
jobs:
process-and-register:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup DVC
uses: iterative/setup-dvc@v1
- name: Run DVC pipeline
env:
DB_HOST: ${{ secrets.DB_HOST }}
DB_USER: ${{ secrets.DB_USER }}
DB_PASSWORD: ${{ secrets.DB_PASSWORD }}
DB_NAME: ${{ secrets.DB_NAME }}
run: |
dvc repro
- name: Register Feature Version
run: |
# A simple script to parse the feature name from the filename
# and get the version from the Git commit hash.
FEATURE_SQL_PATH=$(git diff-tree --no-commit-id --name-only -r HEAD | grep 'features/.*\.sql')
FEATURE_NAME=$(basename $FEATURE_SQL_PATH .sql)
VERSION=$(git rev-parse --short HEAD)
curl -X POST http://control-plane.internal:3000/api/v1/register \
-H "Content-Type: application/json" \
-d '{
"featureName": "'"$FEATURE_NAME"'",
"version": "'"$VERSION"'",
"description": "Auto-registered from CI pipeline",
"valueType": "NUMERIC",
"sourceSqlPath": "'"$FEATURE_SQL_PATH"'",
"offlineTableName": "'"$FEATURE_NAME"'"
}'
这里的坑在于,DVC原生更适合追踪文件系统中的数据。当输出是数据库表时,我们采用了一种变通策略:dvc repro
负责执行SQL,但我们不让DVC直接追踪数据库状态。而是通过CI脚本在DVC执行成功后,调用我们的控制平面API来记录元数据,从而将Git-DVC的世界与我们的应用状态(特征注册表)连接起来。
Koa控制平面API实现
控制平面是一个简单的Koa应用,它提供了一个内部API端点用于特征注册。
services/control-plane/src/db.js
// src/db.js
const { Pool } = require('pg');
// In a real project, use a robust configuration management system.
// Here we use environment variables for simplicity.
const pool = new Pool({
host: process.env.DB_HOST || 'localhost',
user: process.env.DB_USER || 'user',
password: process.env.DB_PASSWORD || 'password',
database: process.env.DB_NAME || 'feature_store',
port: parseInt(process.env.DB_PORT || '5432', 10),
});
pool.on('error', (err, client) => {
console.error('Unexpected error on idle client', err);
process.exit(-1);
});
module.exports = {
query: (text, params) => pool.query(text, params),
};
services/control-plane/src/routes.js
// src/routes.js
const Router = require('@koa/router');
const db = require('./db');
const router = new Router({
prefix: '/api/v1'
});
router.post('/register', async (ctx) => {
const {
featureName,
version,
description,
valueType,
sourceSqlPath,
offlineTableName
} = ctx.request.body;
// Basic validation
if (!featureName || !version || !sourceSqlPath || !offlineTableName) {
ctx.status = 400;
ctx.body = { error: 'Missing required fields: featureName, version, sourceSqlPath, offlineTableName' };
return;
}
try {
const queryText = `
INSERT INTO feature_registry(feature_name, version, status, description, value_type, source_sql_path, offline_table_name)
VALUES($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (feature_name, version)
DO UPDATE SET
status = EXCLUDED.status,
description = EXCLUDED.description,
value_type = EXCLUDED.value_type,
source_sql_path = EXCLUDED.source_sql_path,
offline_table_name = EXCLUDED.offline_table_name,
updated_at = NOW()
RETURNING *;
`;
const values = [featureName, version, 'ACTIVE', description, valueType, sourceSqlPath, offlineTableName];
const { rows } = await db.query(queryText, values);
// A common mistake is not logging structured context.
console.log({
message: 'Feature version registered/updated successfully',
featureName,
version,
registryId: rows[0].id,
});
ctx.status = 201;
ctx.body = rows[0];
} catch (err) {
console.error({
message: 'Error registering feature',
error: err.message,
stack: err.stack,
requestBody: ctx.request.body
});
ctx.status = 500;
ctx.body = { error: 'Internal Server Error' };
}
});
router.get('/features/:name', async (ctx) => {
// This endpoint could be used by feature services to discover active versions
const { name } = ctx.params;
try {
const { rows } = await db.query(
'SELECT * FROM feature_registry WHERE feature_name = $1 AND status = $2 ORDER BY created_at DESC',
[name, 'ACTIVE']
);
if (rows.length === 0) {
ctx.status = 404;
ctx.body = { error: `No active features found with name: ${name}` };
return;
}
ctx.body = rows;
} catch (err) {
console.error(`Error fetching feature: ${name}`, err);
ctx.status = 500;
ctx.body = { error: 'Internal Server Error' };
}
});
module.exports = router;
services/control-plane/src/server.js
// src/server.js
require('dotenv').config(); // Load .env file
const Koa = require('koa');
const bodyParser = require('koa-bodyparser');
const router = require('./routes');
const app = new Koa();
const PORT = process.env.PORT || 3000;
// Centralized error handling middleware
app.use(async (ctx, next) => {
try {
await next();
} catch (err) {
ctx.status = err.status || 500;
ctx.body = { error: err.message || 'Internal Server Error' };
// In production, you'd want more robust logging here.
console.error(`Unhandled error on ${ctx.method} ${ctx.url}:`, err);
}
});
app.use(bodyParser());
app.use(router.routes());
app.use(router.allowedMethods());
// Graceful shutdown logic
const server = app.listen(PORT, () => {
console.log(`Control Plane API server running on port ${PORT}`);
});
process.on('SIGTERM', () => {
console.log('SIGTERM signal received: closing HTTP server');
server.close(() => {
console.log('HTTP server closed');
// Here you would also close DB connections, etc.
process.exit(0);
});
});
这个Koa应用非常精简,只做了必要的事情:接收JSON,验证,写入数据库,并提供最基本的日志和错误处理。在生产环境中,还需要加入更详细的结构化日志、请求追踪ID、认证授权等。
Envoy Proxy的动态路由配置
这是将版本化理念带到线上的关键一步。假设我们有两个版本的特征服务feature-service-v-abc123
和feature-service-v-def456
,分别对应两个Git提交。我们希望通过HTTP头 x-feature-version
来控制流量。
Envoy的配置比较复杂,但威力巨大。
envoy.yaml
# envoy.yaml
static_resources:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 8000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: feature_service_vhost
domains: ["*"]
routes:
- match:
prefix: "/features/user_7day_avg_spend"
route:
# We use a weighted cluster approach, but routing rules override this.
weighted_clusters:
clusters:
- name: feature_service_stable
weight: 100
- name: feature_service_canary
weight: 0
# The core logic for version-based routing
# This allows us to target specific versions for debugging or A/B testing.
# If the header is present, it routes to the specified cluster.
decorator:
operation: "get_feature"
- match:
prefix: "/features/user_7day_avg_spend"
headers:
- name: x-feature-version
exact_match: "abc123" # Corresponds to a specific git hash / version
route:
cluster: feature_service_stable # Maps to the pod with this version
- match:
prefix: "/features/user_7day_avg_spend"
headers:
- name: x-feature-version
exact_match: "def456" # A newer version
route:
cluster: feature_service_canary
http_filters:
- name: envoy.filters.http.router
typed_config: {}
clusters:
- name: feature_service_stable
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: feature_service_stable
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
# In Kubernetes, this would be the service name.
address: feature-service-v-abc123.internal
port_value: 8080
- name: feature_service_canary
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: feature_service_canary
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: feature-service-v-def456.internal
port_value: 8080
这段Envoy配置定义了:
- 一个监听8000端口的HTTP监听器。
- 一个虚拟主机,匹配所有域名。
- 一个复杂的路由规则:
- 首先定义了两个路由匹配规则,它们检查
x-feature-version
头。如果请求头是x-feature-version: abc123
,流量会被发送到feature_service_stable
集群。如果是def456
,则发送到feature_service_canary
集群。 - 一个默认的、优先级较低的路由规则,它将所有不匹配上述头部的请求发送到
feature_service_stable
。通过调整weighted_clusters
,我们可以实现百分比的灰度发布。
- 首先定义了两个路由匹配规则,它们检查
- 两个集群
feature_service_stable
和feature_service_canary
,分别指向不同版本的后端服务地址。
在生产环境中,这套配置会通过Envoy的xDS API动态下发,而不是静态文件。我们的控制平面可以扩展,在注册新特征版本并部署新服务实例后,自动生成并推送新的Envoy配置。
局限性与未来展望
这个系统虽然解决了我最初提到的核心痛点,但它远非一个完备的特征平台。
- 批处理限制: 当前的设计完全基于批处理。SQL脚本的执行是周期性的,无法满足对实时事件流进行特征计算的需求。要支持流式特征,需要引入Flink或Spark Streaming等流计算引擎,架构会变得复杂得多。
- 在线/离线一致性: 系统只保证了离线特征的生成和版本化。在线服务如何获取这些特征(直接查PostgreSQL?还是同步到Redis/DynamoDB等低延迟存储?)并未详细设计。确保在线服务获取的数据与离线训练时使用的数据定义完全一致,是一个巨大的挑战。
- 服务部署自动化: 我们谈到了CI/CD会部署新的服务实例,但具体的实现(例如,使用Kubernetes Operator或Helm模板动态创建Deployment和Service)并未覆盖。这本身就是一个复杂的工程问题。
- Schema管理: 当特征的定义发生变化,导致输出数据表的Schema改变时,如何管理下游的兼容性?这需要引入类似Protobuf或Avro的Schema管理机制,并在CI流程中加入强制的兼容性检查。
未来的迭代方向很明确:首先是构建一个高性能的在线特征服务层,它能从PostgreSQL同步数据到低延迟的KV存储中。其次是探索集成流处理框架,以支持实时特征。最后,将Envoy的配置管理完全自动化,并与Kubernetes的部署生命周期深度绑定,形成一个真正动态、自愈的特征服务网格。