From e72866db8da86508893e70907155b701af56d426 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Thu, 11 Jun 2026 16:22:23 +0800 Subject: [PATCH] feat(config): integrate etcd for service discovery and config management - Add etcd-client dependency for distributed configuration storage - Implement EtcdConfig with priority: etcd > environment variables > defaults - Add ServiceRegistry for service registration with lease keep-alive - Integrate etcd-based service discovery for appks gRPC connections - Add service watcher for real-time service instance updates - Migrate Redis configuration from single URL to cluster node list - Update Dockerfile with default IMKS_HOST and IMKS_PORT environment variables - Add etcd bootstrap configuration through environment variables - Implement Redis cluster URL building with optional authentication --- .env.example | 63 +++++------- Cargo.lock | 20 ++++ Cargo.toml | 22 +++-- Dockerfile | 5 +- etcd.rs | 188 ++++++++++++++++++++++++++++++++++++ lib.rs | 1 + main.rs | 46 ++++++++- socket/message_bus/redis.rs | 10 +- svc/deploy.rs | 23 ++++- 9 files changed, 316 insertions(+), 62 deletions(-) create mode 100644 etcd.rs diff --git a/.env.example b/.env.example index 273b4d6..9708b28 100644 --- a/.env.example +++ b/.env.example @@ -1,71 +1,56 @@ # ============================================================================= # imks — IM 实时消息服务 环境变量配置 # 复制此文件为 .env 并修改相应值 +# +# 配置优先级: etcd > 环境变量 > 默认值 # ============================================================================= -# --- 部署模式 --- -# Adapter 模式: "local" (单节点) | "redis" | "nats" -IMKS_ADAPTER=local +# --- etcd 连接(启动引导,必须从环境变量读取)--- +ETCD_ENDPOINTS=http://localhost:2379 +ETCD_KEY_PREFIX=/appks/ -# 当前节点唯一标识(默认取主机名) +# --- 服务自身 --- +# 注册到 etcd 的地址 +# IMKS_ADDR=0.0.0.0:3000 + +# --- 部署模式 --- +# Adapter: "local" (单节点) | "redis" | "nats" +IMKS_ADAPTER=redis + +# 当前节点唯一标识 # IMKS_SERVER_ID=imks-node-1 -# Redis 连接(IMKS_ADAPTER=redis 时必需) -# IMKS_REDIS_URL=redis://localhost:6379 +# Redis Cluster 节点列表(逗号分隔 host:port) +IMKS_REDIS_CLUSTER_NODES=localhost:6379,localhost:6380,localhost:6381,localhost:6382,localhost:6383,localhost:6384 -# NATS 连接(IMKS_ADAPTER=nats 时必需) +# Redis 密码(可选) +# IMKS_REDIS_PASSWORD= + +# NATS 连接(IMKS_ADAPTER=nats 时使用) # IMKS_NATS_URL=nats://localhost:4222 # --- WebTransport (QUIC) --- -# 启用 WebTransport 服务(需要 TLS 证书) # IMKS_WT_ENABLED=false # IMKS_WT_PORT=3001 -# IMKS_WT_CERT_PATH=/path/to/cert.pem -# IMKS_WT_KEY_PATH=/path/to/key.pem +# IMKS_WT_CERT_PATH=/etc/imks/cert.pem +# IMKS_WT_KEY_PATH=/etc/imks/key.pem # --- 数据库 --- -# PostgreSQL 连接字符串 # DATABASE_URL=postgres://imks:password@localhost:5432/imks DATABASE_URL=postgres://localhost/imks -# 连接池配置 # DATABASE_MAX_CONNECTIONS=10 # DATABASE_MIN_CONNECTIONS=2 -# DATABASE_CONNECT_TIMEOUT=30 -# DATABASE_IDLE_TIMEOUT=600 # --- appks gRPC 连接 --- -# appks 核心服务地址 +# fallback:imks 优先通过 etcd 发现 appks 地址 # APPKS_GRPC_ADDR=http://localhost:50051 - -# 连接超时(秒) # APPKS_GRPC_TIMEOUT=10 -# mTLS 配置(生产环境必需) -# APPKS_GRPC_TLS_CA_CERT=/path/to/ca.pem -# APPKS_GRPC_TLS_CLIENT_CERT=/path/to/client.pem -# APPKS_GRPC_TLS_CLIENT_KEY=/path/to/client-key.pem -# APPKS_GRPC_TLS_DOMAIN=appks.internal - # --- OpenTelemetry 可观测性 --- -# 服务名 # OTEL_SERVICE_NAME=imks # OTEL_SERVICE_VERSION=0.1.0 - -# OTLP 收集器地址 # OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 -# 协议: grpc | http/protobuf -# OTEL_EXPORTER_OTLP_PROTOCOL=grpc -# 启用/禁用 telemetry -# OTEL_TRACES_ENABLED=true -# OTEL_METRICS_ENABLED=true -# OTEL_LOGS_ENABLED=true - -# 日志级别: trace | debug | info | warn | error +# --- 日志 --- RUST_LOG=info -# 日志格式: json | pretty -# LOG_FORMAT=json - -# 部署环境标识 -# OTEL_RESOURCE_ATTRIBUTES_DEPLOYMENT=development diff --git a/Cargo.lock b/Cargo.lock index 90b673d..e285c1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -939,6 +939,24 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "etcd-client" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ed900ba953ca6bf1fadb75e0c6b73d8463b9e2bb6bdb7b4573e8e7295852fbe" +dependencies = [ + "http 1.4.2", + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tonic-prost", + "tonic-prost-build", + "tower", + "tower-service", +] + [[package]] name = "etcetera" version = "0.11.0" @@ -1612,6 +1630,7 @@ dependencies = [ "base64", "chrono", "dashmap", + "etcd-client", "fred", "futures-util", "jsonwebtoken", @@ -1629,6 +1648,7 @@ dependencies = [ "sqlx", "thiserror 2.0.18", "tokio", + "tokio-stream", "tonic", "tonic-build", "tonic-health", diff --git a/Cargo.toml b/Cargo.toml index d911d7d..7bf6cc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,15 +14,15 @@ name = "imks" [dependencies] -tonic = { version = "0.14.6", features = ["tls-ring"] } -prost = "0.14.3" +tonic = { version = "0.14", features = ["tls-ring"] } +prost = "0.14" prost-types = "0.14" -tonic-build = "0.14.6" -tonic-health = "0.14.6" -tonic-prost = "0.14.6" -tokio = { version = "1.52.3", features = ["full"] } -actix-web = { version = "4.13.0", features = [] } -actix-ws = { version = "0.4.0", features = [] } +tonic-build = "0.14" +tonic-health = "0.14" +tonic-prost = "0.14" +tokio = { version = "1", features = ["full"] } +actix-web = { version = "4", features = [] } +actix-ws = { version = "0.4", features = [] } actix-rt = "2" serde = { version = "1", features = ["derive"] } serde_json = { version = "1" } @@ -34,6 +34,8 @@ rand = "0.9" wtransport = "0.7" dashmap = "6" thiserror = "2" +etcd-client = { version = "0.18", features = ["tls"] } +tokio-stream = { version = "0.1", features = ["sync"] } async-trait = "0.1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt", "registry"] } @@ -52,5 +54,5 @@ arc-swap = "1" [build-dependencies] -tonic-prost-build = "0.14.6" -walkdir = "2.5.0" \ No newline at end of file +tonic-prost-build = "0.14" +walkdir = "2.5" \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index cc820f7..e8c71d4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,6 @@ RUN cargo build --release --bin imks && \ strip target/release/imks FROM ubuntu:26.04 - RUN apt-get update && \ apt-get install -y --no-install-recommends ca-certificates curl && \ rm -rf /var/lib/apt/lists/* @@ -27,10 +26,12 @@ COPY --from=builder /app/target/release/imks /usr/local/bin/imks COPY --from=builder /app/migrate/ /app/migrate/ WORKDIR /app - RUN useradd -m -u 1000 imks && chown -R imks:imks /app USER imks +ENV IMKS_HOST=0.0.0.0 +ENV IMKS_PORT=3000 + EXPOSE 3000 HEALTHCHECK --interval=15s --timeout=3s --start-period=10s --retries=3 \ diff --git a/etcd.rs b/etcd.rs new file mode 100644 index 0000000..995d26e --- /dev/null +++ b/etcd.rs @@ -0,0 +1,188 @@ +use etcd_client::{Client, PutOptions, GetOptions, WatchOptions}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_stream::StreamExt; + +/// etcd-backed config reader. Priority: etcd > env var > default. +pub struct EtcdConfig { + client: Arc>, + prefix: String, +} + +impl EtcdConfig { + pub async fn connect(endpoints: Vec, prefix: &str) -> Result { + let client = Client::connect(endpoints, None) + .await + .map_err(|e| format!("etcd connect: {e}"))?; + Ok(Self { client: Arc::new(Mutex::new(client)), prefix: prefix.to_string() }) + } + + /// Get config value: etcd first, then env var, then default. + pub async fn get(&self, key: &str, default: &str) -> String { + let etcd_key = format!("{}config/{}", self.prefix, key); + if let Ok(mut c) = self.client.try_lock() { + if let Ok(resp) = c.get(etcd_key.as_str(), None).await { + if let Some(kv) = resp.kvs().first() { + if let Ok(v) = kv.value_str() { + if !v.is_empty() { + tracing::info!(key, value = v, "config from etcd"); + return v.to_string(); + } + } + } + } + } + if let Ok(v) = std::env::var(key) { + if !v.is_empty() { + return v; + } + } + default.to_string() + } + + pub async fn get_parsed(&self, key: &str, default: T) -> T + where T::Err: std::fmt::Display, T: std::fmt::Display + { + let s = self.get(key, &default.to_string()).await; + s.parse().unwrap_or(default) + } + + /// Get the etcd client for use by ServiceRegistry. + pub fn client(&self) -> Arc> { + self.client.clone() + } + + /// Discover service instances registered under /{prefix}/services/{service_name}/. + pub async fn discover_service(&self, service_name: &str) -> Result, String> { + let prefix = format!("{}services/{}/", self.prefix, service_name); + let mut client = self.client.lock().await; + let resp = client + .get(prefix.as_str(), Some(GetOptions::new().with_prefix())) + .await + .map_err(|e| format!("etcd get {prefix}: {e}"))?; + + let mut addrs = Vec::new(); + for kv in resp.kvs() { + if let Ok(value) = kv.value_str() { + if let Ok(instance) = serde_json::from_str::(value) { + if let Some(addr) = instance.get("addr").and_then(|v| v.as_str()) { + addrs.push(addr.to_string()); + } + } + } + } + tracing::info!(service = service_name, count = addrs.len(), "discovered instances"); + Ok(addrs) + } + + /// Watch a service for live updates. + pub fn start_service_watcher(&self, service_name: &str) { + let client = self.client.clone(); + let prefix = self.prefix.clone(); + let svc = service_name.to_string(); + let watch_prefix = format!("{}services/{}/", prefix, svc); + + tokio::spawn(async move { + loop { + let mut stream = { + let mut c = client.lock().await; + match c.watch(watch_prefix.as_str(), Some(WatchOptions::new().with_prefix())).await { + Ok(s) => s, + Err(e) => { + tracing::warn!(service = %svc, error = %e, "watch failed, retry in 3s"); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + continue; + } + } + }; + while let Some(resp) = stream.next().await { + if let Ok(resp) = resp { + for event in resp.events() { + if let Some(kv) = event.kv() { + let addr = kv.value_str().unwrap_or_default(); + let key = kv.key_str().unwrap_or_default(); + match event.event_type() { + etcd_client::EventType::Put => { + tracing::info!(service = %svc, key, addr, "service up"); + } + etcd_client::EventType::Delete => { + tracing::info!(service = %svc, key, "service down"); + } + } + } + } + } + } + } + }); + } +} + +/// Register this service instance in etcd. +pub struct ServiceRegistry { + client: Arc>, + prefix: String, +} + +impl ServiceRegistry { + pub fn new(client: Arc>, prefix: &str) -> Self { + Self { client, prefix: prefix.to_string() } + } + + pub async fn register(&self, service_name: &str, addr: &str) -> Result<(), String> { + let instance_id = uuid::Uuid::now_v7().to_string(); + let addr = addr.to_string(); + let key = format!("{}services/{}/{}", self.prefix, service_name, instance_id); + + let instance = serde_json::json!({ + "addr": &addr, + "port": 0, + "version": env!("CARGO_PKG_VERSION"), + }); + let value = serde_json::to_string(&instance).map_err(|e| format!("json: {e}"))?; + + let lease = { + let mut client = self.client.lock().await; + client.lease_grant(15, None).await.map_err(|e| format!("lease: {e}"))? + }; + + { + let mut client = self.client.lock().await; + let opts = PutOptions::new().with_lease(lease.id()); + client.put(key.clone(), value, Some(opts)).await.map_err(|e| format!("put: {e}"))?; + } + + tracing::info!(service = service_name, instance = %instance_id, addr = %addr, "registered in etcd"); + + let c = self.client.clone(); + tokio::spawn(async move { + loop { + let result = { + let mut client = c.lock().await; + client.lease_keep_alive(lease.id()).await + }; + match result { + Ok((_keeper, mut stream)) => { + while stream.next().await.is_some() {} + } + Err(e) => tracing::warn!(lease_id = lease.id(), error = %e, "keepalive failed"), + } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let new_lease = { + let mut client = c.lock().await; + client.lease_grant(15, None).await + }; + if let Ok(lr) = new_lease { + let instance = serde_json::json!({"addr": &addr, "port": 0, "version": env!("CARGO_PKG_VERSION")}); + if let Ok(v) = serde_json::to_string(&instance) { + let mut client = c.lock().await; + let opts = PutOptions::new().with_lease(lr.id()); + let _ = client.put(key.clone(), v, Some(opts)).await; + } + } + } + }); + + Ok(()) + } +} diff --git a/lib.rs b/lib.rs index 9606646..9c71af1 100644 --- a/lib.rs +++ b/lib.rs @@ -2,6 +2,7 @@ pub mod auth; pub mod database; pub mod engine; pub mod error; +pub mod etcd; pub mod models; pub mod pb; pub mod repo; diff --git a/main.rs b/main.rs index 42fe39f..41316f4 100644 --- a/main.rs +++ b/main.rs @@ -2,6 +2,7 @@ use std::sync::{Arc, OnceLock}; use imks::database::{Database, DatabaseConfig}; use imks::engine::server::EngineConfig; +use imks::etcd::{EtcdConfig, ServiceRegistry}; use imks::repo::MessageRepo; use imks::rpc::{AppksClients, RpcConfig}; use imks::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter}; @@ -17,6 +18,17 @@ fn main() -> Result<(), Box> { telemetry::health::init_counters(); let deploy = DeployConfig::from_env(); + + // Read etcd bootstrap config from env (these MUST come from env, not etcd) + let etcd_endpoints: Vec = std::env::var("ETCD_ENDPOINTS") + .unwrap_or_else(|_| "http://localhost:2379".to_string()) + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + let etcd_prefix = std::env::var("ETCD_KEY_PREFIX") + .unwrap_or_else(|_| "/appks/".to_string()); + tracing::info!( adapter = %deploy.adapter_mode, server_id = %deploy.server_id, @@ -24,11 +36,33 @@ fn main() -> Result<(), Box> { "Starting imks server" ); - let addr = "0.0.0.0:3000"; + let addr = "0.0.0.0:50048"; let rt = tokio::runtime::Runtime::new()?; rt.block_on(async { + // --- etcd: connect, register, discover appks --- + let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await + .unwrap_or_else(|e| { + tracing::error!(error = %e, "etcd required but unavailable"); + panic!("etcd required: {e}") + }); + + // Register this service so others can discover us + let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix); + let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await; + registry.register("imks", &imks_addr).await.ok(); + + // Discover appks from etcd (priority > env) + let appks_addr = etcd.discover_service("appks").await + .ok() + .and_then(|addrs| addrs.into_iter().next()) + .unwrap_or_else(|| { + std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string()) + }); + tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd"); + etcd.start_service_watcher("appks"); + let engine_config = EngineConfig::default(); let mut builder = SocketServerBuilder::new(engine_config); let namespace_holder: Arc>> = @@ -37,10 +71,11 @@ fn main() -> Result<(), Box> { // Pre-configure adapter for Redis/NATS mode. match deploy.adapter_mode.as_str() { "redis" => { + let cluster_url = deploy.redis_cluster_url(); let message_bus = Arc::new( - RedisMessageBus::new(&deploy.redis_url) + RedisMessageBus::new(&cluster_url) .await - .map_err(|e| format!("Failed to connect to Redis: {e}"))?, + .map_err(|e| format!("Failed to connect to Redis cluster: {e}"))?, ); let redis_client = message_bus.client().clone(); let server_id = deploy.server_id.clone(); @@ -88,7 +123,10 @@ fn main() -> Result<(), Box> { // Initialize database + gRPC + service let service: Option> = { - let rpc_config = RpcConfig::from_env(); + let rpc_config = RpcConfig { + appks_addr: appks_addr.clone(), + ..RpcConfig::from_env() + }; let db_config = DatabaseConfig::from_env(); match AppksClients::connect(&rpc_config).await { diff --git a/socket/message_bus/redis.rs b/socket/message_bus/redis.rs index 9fb14e8..1027611 100644 --- a/socket/message_bus/redis.rs +++ b/socket/message_bus/redis.rs @@ -12,14 +12,17 @@ pub struct RedisMessageBus { } impl RedisMessageBus { - pub async fn new(redis_url: &str) -> Result { + /// Connect to a Redis cluster. + /// + /// `cluster_url` should be in `redis-cluster://` format, e.g.: + /// `redis-cluster://host1:6379,host2:6379,host3:6379` + pub async fn new(cluster_url: &str) -> Result { let config = - Config::from_url(redis_url).map_err(|e| MessageBusError::Redis(e.to_string()))?; + Config::from_url(cluster_url).map_err(|e| MessageBusError::Redis(e.to_string()))?; let client = Client::new(config.clone(), None, None, None); let subscriber = SubscriberClient::new(config, None, None, None); - // connect() starts the connection task; result is checked by wait_for_connect() let _ = client.connect().await; let _ = subscriber.connect().await; @@ -32,6 +35,7 @@ impl RedisMessageBus { .await .map_err(|e| MessageBusError::Redis(e.to_string()))?; + tracing::info!(cluster_url, "Redis cluster connected"); Ok(Self { client, subscriber }) } diff --git a/svc/deploy.rs b/svc/deploy.rs index 47e5c71..ad0e29d 100644 --- a/svc/deploy.rs +++ b/svc/deploy.rs @@ -10,8 +10,11 @@ use std::env; pub struct DeployConfig { /// "local" | "redis" | "nats" pub adapter_mode: String, - /// Redis connection URL (used when adapter_mode = "redis"). - pub redis_url: String, + /// Redis cluster nodes, comma-separated host:port pairs. + /// Example: "redis1:6379,redis2:6379,redis3:6379" + pub redis_cluster_nodes: String, + /// Redis password (optional). + pub redis_password: String, /// NATS connection URL (used when adapter_mode = "nats"). pub nats_url: String, /// Unique server ID for this node. @@ -32,8 +35,9 @@ impl DeployConfig { Self { adapter_mode: env::var("IMKS_ADAPTER").unwrap_or_else(|_| "local".into()), - redis_url: env::var("IMKS_REDIS_URL") - .unwrap_or_else(|_| "redis://localhost:6379".into()), + redis_cluster_nodes: env::var("IMKS_REDIS_CLUSTER_NODES") + .unwrap_or_else(|_| "localhost:6379,localhost:6380,localhost:6381".into()), + redis_password: env::var("IMKS_REDIS_PASSWORD").unwrap_or_default(), nats_url: env::var("IMKS_NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()), server_id, webtransport_enabled: env::var("IMKS_WT_ENABLED") @@ -47,6 +51,17 @@ impl DeployConfig { key_path: env::var("IMKS_WT_KEY_PATH").unwrap_or_default(), } } + + /// Build a redis-cluster URL from cluster_nodes and optional password. + /// Format: redis-cluster://[:password@]host1:port1,host2:port2,... + pub fn redis_cluster_url(&self) -> String { + let auth = if self.redis_password.is_empty() { + String::new() + } else { + format!(":{}@", self.redis_password) + }; + format!("redis-cluster://{}{}", auth, self.redis_cluster_nodes) + } } impl Default for DeployConfig {