WhereHows项目集成Confluent Cloud部署指南
2025-07-06 05:56:07作者:伍希望
前言
WhereHows作为LinkedIn开源的数据发现与元数据管理平台,其核心架构依赖于Kafka进行元数据变更事件的传递。本文将详细介绍如何将WhereHows与Confluent Cloud服务集成,替代默认的本地Kafka部署方案。
核心概念理解
在开始配置前,需要理解几个关键概念:
- Metadata Change Request (MCR): 元数据变更请求,表示对元数据的修改请求
- Metadata Change Log (MCL): 元数据变更日志,记录已确认的元数据变更
- Schema Registry: 用于管理Kafka消息的Avro schema
环境准备
1. 创建Confluent Cloud主题
在Confluent控制台中创建以下主题(建议分区数不少于3):
主题名称 | 用途说明 | 重要配置 |
---|---|---|
MetadataChangeRequest_v1 | 接收元数据变更请求 | 默认配置 |
FailedMetadataChangeRequest_v1 | 存储处理失败的请求 | 适当增加保留时间 |
MetadataChangeLog_Versioned_v1 | 版本化元数据变更日志 | 压缩策略 |
MetadataChangeLog_Timeseries_v1 | 时间序列元数据变更 | 适当分区数 |
DataHubUsageEvent_v1 | 用户行为跟踪事件 | 根据流量调整 |
DataHubUpgradeHistory_v1 | 升级历史通知 | 无限保留,1分区 |
注意:DataHubUpgradeHistory_v1主题必须配置为无限保留(infinite retention)
容器化部署配置
Docker Compose方案
1. 配置GMS服务
修改docker/gms/env/docker.env
文件:
# Kafka基础配置
KAFKA_BOOTSTRAP_SERVER=pkc-g4ml2.eu-west-2.aws.confluent.cloud:9092
KAFKA_SCHEMAREGISTRY_URL=https://plrm-qwlpp.us-east-2.aws.confluent.cloud
# Confluent安全配置
SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL=SASL_SSL
SPRING_KAFKA_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='API_KEY' password='API_SECRET';
SPRING_KAFKA_PROPERTIES_SASL_MECHANISM=PLAIN
SPRING_KAFKA_PROPERTIES_CLIENT_DNS_LOOKUP=use_all_dns_ips
SPRING_KAFKA_PROPERTIES_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO
SPRING_KAFKA_PROPERTIES_BASIC_AUTH_USER_INFO=SR_API_KEY:SR_API_SECRET
2. 配置前端服务
修改docker/datahub-frontend/env/docker.env
:
KAFKA_BOOTSTRAP_SERVER=pkc-g4ml2.eu-west-2.aws.confluent.cloud:9092
# 安全配置与GMS相同
KAFKA_PROPERTIES_SECURITY_PROTOCOL=SASL_SSL
KAFKA_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username='API_KEY' password='API_SECRET';
KAFKA_PROPERTIES_SASL_MECHANISM=PLAIN
KAFKA_PROPERTIES_CLIENT_DNS_LOOKUP=use_all_dns_ips
KAFKA_PROPERTIES_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO
KAFKA_PROPERTIES_BASIC_AUTH_USER_INFO=SR_API_KEY:SR_API_SECRET
3. 禁用本地Kafka服务
在docker-compose.yml中注释或删除以下服务:
- zookeeper
- kafka
- schema-registry
Kubernetes Helm部署
1. 修改values.yaml
# 禁用内置服务
cp-schema-registry:
enabled: false
kafkaSetupJob:
enabled: false
# 配置Confluent Cloud连接
kafka:
bootstrap:
server: pkc-g4ml2.eu-west-2.aws.confluent.cloud:9092
schemaregistry:
url: https://plrm-qwlpp.us-east-2.aws.confluent.cloud
2. 创建Kubernetes Secrets
kubectl create secret generic confluent-secrets \
--from-literal=sasl_jaas_config="org.apache.kafka.common.security.plain.PlainLoginModule required username='API_KEY' password='API_SECRET';" \
--from-literal=basic_auth_user_info="SR_API_KEY:SR_API_SECRET"
3. 配置安全参数
在values.yaml中添加:
credentialsAndCertsSecrets:
name: confluent-secrets
secureEnv:
sasl.jaas.config: sasl_jaas_config
basic.auth.user.info: basic_auth_user_info
springKafkaConfigurationOverrides:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
client.dns.lookup: use_all_dns_ips
basic.auth.credentials.source: USER_INFO
DataHub Actions特殊配置
对于Actions组件,需要额外配置Python风格的Kafka客户端参数:
1. 修改executor.yaml
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
consumer_config:
security.protocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:-PLAINTEXT}
sasl.mechanism: ${KAFKA_PROPERTIES_SASL_MECHANISM:-PLAIN}
sasl.username: ${KAFKA_PROPERTIES_SASL_USERNAME}
sasl.password: ${KAFKA_PROPERTIES_SASL_PASSWORD}
schema_registry_config:
basic.auth.user.info: ${KAFKA_PROPERTIES_BASIC_AUTH_USER_INFO}
2. 添加环境变量
对于Docker部署,在datahub-actions的env文件中添加:
KAFKA_PROPERTIES_SASL_USERNAME=API_KEY
KAFKA_PROPERTIES_SASL_PASSWORD=API_SECRET
对于Kubernetes部署,在secrets中添加:
credentialsAndCertsSecrets:
name: confluent-secrets
secureEnv:
sasl.username: sasl_username
sasl.password: sasl_password
验证与测试
部署完成后,建议进行以下验证步骤:
- 检查各Pod/容器日志,确认无连接错误
- 在Confluent Cloud控制台查看主题消息流量
- 执行简单的元数据变更操作,验证端到端流程
常见问题排查
连接失败:
- 检查API密钥和Secret是否正确
- 验证网络连通性(特别是企业防火墙设置)
- 确认Confluent Cloud集群区域匹配
Schema注册失败:
- 检查Schema Registry URL格式(必须为https)
- 验证Schema Registry API密钥
性能问题:
- 根据负载调整主题分区数
- 监控Confluent Cloud资源使用情况
最佳实践建议
- 安全隔离:为不同环境(dev/stage/prod)使用独立的Confluent Cloud集群
- 监控配置:设置适当的告警规则监控消息积压
- 成本优化:根据流量模式选择合适的Confluent Cloud套餐
- 备份策略:定期导出重要主题数据
通过以上配置,WhereHows可以充分利用Confluent Cloud提供的托管Kafka服务,减少运维复杂度,同时获得企业级的消息服务保障。