From b797e360c0b73fd8644b7b91f6eb9aff0b038eb8 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Thu, 11 Jun 2026 22:50:40 +0800 Subject: [PATCH] feat(registry): add service discovery and health check capabilities - Integrate tonic-health for gRPC service health monitoring - Add etcd-based service registration with automatic keep-alive - Implement dynamic configuration loading from etcd with fallback - Remove external dependencies from docker-compose for simplified deployment - Refactor service registration logic with improved lease management - Add health service to gRPC server with serving status reporting --- Cargo.lock | 15 +++ Cargo.toml | 1 + docker-compose.yaml | 255 -------------------------------------------- etcd/discovery.rs | 1 + etcd/mod.rs | 26 +++++ etcd/register.rs | 85 ++++----------- grpc/mod.rs | 8 ++ main.rs | 8 +- 8 files changed, 73 insertions(+), 326 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 454b0da..547e2f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,6 +380,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tonic-health", "tonic-prost", "tonic-prost-build", "tracing", @@ -4636,6 +4637,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -4714,6 +4716,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tonic-health" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcfab99db777fba2802f0dfa861d1628d1ae916fb199d29819941f139ae85082" +dependencies = [ + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-prost", +] + [[package]] name = "tonic-prost" version = "0.14.6" diff --git a/Cargo.toml b/Cargo.toml index a5465d2..310c828 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ tonic = { version = "0.14", features = ["transport", "channel"] } prost = "0.14" prost-types = "0.14" tonic-prost = "0.14" +tonic-health = "0.14.6" url = "2.5" etcd-client = { version = "0.18", features = ["tls"] } tokio-stream = "0.1" diff --git a/docker-compose.yaml b/docker-compose.yaml index 790aa68..87a52d7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -14,7 +14,6 @@ x-appks-env: &appks-env APP_SESSION_COOKIE_PATH: / APP_SESSION_TTL_SECS: 86400 APP_SESSION_MAX_AGE_SECS: 86400 - # Postgres APP_DATABASE_URL: postgres://appks:appks@postgres:5432/appks DATABASE_URL: postgres://appks:appks@postgres:5432/appks APP_DATABASE_MAX_CONNECTIONS: 10 @@ -22,15 +21,11 @@ x-appks-env: &appks-env APP_DATABASE_IDLE_TIMEOUT: 600 APP_DATABASE_MAX_LIFETIME: 3600 APP_DATABASE_CONNECTION_TIMEOUT: 8 - APP_DATABASE_SCHEMA_SEARCH_PATH: public - APP_DATABASE_READ_WRITE_SPLIT: "false" APP_DATABASE_RETRY_ATTEMPTS: 3 APP_DATABASE_RETRY_DELAY: 5 - # Redis (cluster mode) APP_REDIS_CLUSTER_ENABLED: "true" APP_REDIS_CLUSTER_NODES: redis://redis-node-0:6379,redis://redis-node-1:6379,redis://redis-node-2:6379,redis://redis-node-3:6379,redis://redis-node-4:6379,redis://redis-node-5:6379 APP_REDIS_READ_FROM_REPLICAS: "false" - APP_REDIS_PASSWORD: "" APP_REDIS_MAX_CONNECTIONS: 20 APP_REDIS_MIN_CONNECTIONS: 2 APP_REDIS_IDLE_TIMEOUT: 300 @@ -39,7 +34,6 @@ x-appks-env: &appks-env APP_REDIS_RETRY_DELAY_MS: 100 APP_REDIS_TLS_ENABLED: "false" APP_REDIS_KEY_PREFIX: "appks:" - # etcd APP_ETCD_ENDPOINTS: http://etcd:2379 APP_ETCD_KEY_PREFIX: /appks/ APP_ETCD_CONNECT_TIMEOUT: 5 @@ -48,7 +42,6 @@ x-appks-env: &appks-env APP_ETCD_LEASE_TTL: 15 APP_ETCD_MAX_RETRIES: 3 APP_ETCD_REGISTER_SELF: "true" - # NATS APP_NATS_URL: nats://nats:4222 APP_NATS_CONNECTION_TIMEOUT: 5 APP_NATS_PING_INTERVAL: 20 @@ -57,7 +50,6 @@ x-appks-env: &appks-env APP_NATS_STREAM_PREFIX: APPKS APP_NATS_ACK_WAIT_SECS: 30 APP_NATS_MAX_DELIVER: 5 - # S3 / MinIO APP_S3_ENDPOINT: http://minio:9000 APP_S3_REGION: us-east-1 APP_S3_ACCESS_KEY: admin @@ -73,34 +65,16 @@ x-appks-env: &appks-env APP_S3_UPLOAD_PART_SIZE: 8388608 APP_S3_MAX_UPLOAD_SIZE: 104857600 APP_S3_PRESIGNED_URL_EXPIRY: 3600 - # LRU cache APP_LRU_DEFAULT_CAPACITY: 1000 APP_LRU_DEFAULT_TTL_SECS: 300 APP_LRU_CLEANUP_INTERVAL_SECS: 60 - # gRPC APP_RPC_SELF_HOST: 0.0.0.0 APP_RPC_SELF_PORT: 50049 APP_RPC_SELF_REFLECTION: "false" APP_RPC_SELF_SERVICE_NAME: appks APP_RPC_DEFAULT_TIMEOUT_SECS: 10 - # AI (disabled by default in compose — set externally) - APP_AI_PROVIDER_API_KEY: "" - APP_AI_PROVIDER_URL: "" - # Qdrant - APP_QDRANT_URL: http://qdrant:6334 - APP_QDRANT_COLLECTION: appks_embeddings - APP_QDRANT_VECTOR_SIZE: 1536 - APP_QDRANT_DISTANCE: Cosine - APP_QDRANT_MAX_CONNECTIONS: 10 - APP_QDRANT_IDLE_TIMEOUT: 300 - APP_QDRANT_CONNECTION_TIMEOUT: 10 - APP_QDRANT_MAX_RETRIES: 3 - APP_QDRANT_TLS_ENABLED: "false" - APP_QDRANT_SEARCH_LIMIT: 10 - APP_QDRANT_SCORE_THRESHOLD: 0.7 services: - # AppKS appks: image: appks restart: unless-stopped @@ -109,232 +83,3 @@ services: - "50049:50049" environment: <<: *appks-env - depends_on: - postgres: - condition: service_healthy - etcd: - condition: service_started - nats: - condition: service_started - minio: - condition: service_healthy - qdrant: - condition: service_started - networks: - - appks-net - - # PostgreSQL - postgres: - image: postgres:17-alpine - restart: unless-stopped - environment: - POSTGRES_USER: appks - POSTGRES_PASSWORD: appks - POSTGRES_DB: appks - ports: - - "5432:5432" - volumes: - - pg_data:/var/lib/postgresql/data - healthcheck: - test: ["CMD-SHELL", "pg_isready -U appks -d appks"] - interval: 5s - timeout: 5s - retries: 5 - networks: - - appks-net - - # Redis Cluster (6 nodes) - redis-node-0: - image: redis:7-alpine - command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes - restart: unless-stopped - ports: - - "6379:6379" - volumes: - - redis_data_0:/data - networks: - - appks-net - - redis-node-1: - image: redis:7-alpine - command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes - restart: unless-stopped - ports: - - "6380:6379" - volumes: - - redis_data_1:/data - networks: - - appks-net - - redis-node-2: - image: redis:7-alpine - command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes - restart: unless-stopped - ports: - - "6381:6379" - volumes: - - redis_data_2:/data - networks: - - appks-net - - redis-node-3: - image: redis:7-alpine - command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes - restart: unless-stopped - ports: - - "6382:6379" - volumes: - - redis_data_3:/data - networks: - - appks-net - - redis-node-4: - image: redis:7-alpine - command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes - restart: unless-stopped - ports: - - "6383:6379" - volumes: - - redis_data_4:/data - networks: - - appks-net - - redis-node-5: - image: redis:7-alpine - command: redis-server --port 6379 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes - restart: unless-stopped - ports: - - "6384:6379" - volumes: - - redis_data_5:/data - networks: - - appks-net - - redis-cluster-init: - image: redis:7-alpine - depends_on: - - redis-node-0 - - redis-node-1 - - redis-node-2 - - redis-node-3 - - redis-node-4 - - redis-node-5 - entrypoint: > - sh -c ' - echo "Waiting for all Redis nodes to be ready..."; - sleep 5; - echo "Creating Redis cluster..."; - redis-cli --cluster create - redis-node-0:6379 - redis-node-1:6379 - redis-node-2:6379 - redis-node-3:6379 - redis-node-4:6379 - redis-node-5:6379 - --cluster-replicas 1 - --cluster-yes - ' - networks: - - appks-net - - # etcd - etcd: - image: quay.io/coreos/etcd:v3.5 - restart: unless-stopped - command: > - etcd - --name etcd - --data-dir /etcd-data - --listen-client-urls http://0.0.0.0:2379 - --advertise-client-urls http://etcd:2379 - --listen-peer-urls http://0.0.0.0:2380 - --initial-advertise-peer-urls http://etcd:2380 - --initial-cluster etcd=http://etcd:2380 - --initial-cluster-token appks-etcd - --initial-cluster-state new - ports: - - "2379:2379" - - "2380:2380" - volumes: - - etcd_data:/etcd-data - networks: - - appks-net - - # NATS - nats: - image: nats:2-alpine - restart: unless-stopped - command: --js --store_dir /data/nats - ports: - - "4222:4222" - - "8222:8222" - volumes: - - nats_data:/data/nats - networks: - - appks-net - - # MinIO (S3-compatible storage) - minio: - image: minio/minio:latest - restart: unless-stopped - command: server /data --console-address ":9001" - environment: - MINIO_ROOT_USER: admin - MINIO_ROOT_PASSWORD: mysecret123 - ports: - - "9000:9000" - - "9001:9001" - volumes: - - minio_data:/data - healthcheck: - test: ["CMD-SHELL", "mc ready local || exit 1"] - interval: 5s - timeout: 5s - retries: 5 - networks: - - appks-net - - # MinIO bucket auto-creation - minio-init: - image: minio/mc:latest - depends_on: - minio: - condition: service_healthy - entrypoint: > - sh -c ' - mc alias set local http://minio:9000 admin mysecret123; - mc mb --ignore-existing local/appks; - echo "MinIO bucket appks ready"; - exit 0 - ' - networks: - - appks-net - - # Qdrant (vector database) - qdrant: - image: qdrant/qdrant:latest - restart: unless-stopped - ports: - - "6333:6333" - - "6334:6334" - volumes: - - qdrant_data:/qdrant/storage - networks: - - appks-net - -volumes: - pg_data: - redis_data_0: - redis_data_1: - redis_data_2: - redis_data_3: - redis_data_4: - redis_data_5: - etcd_data: - nats_data: - minio_data: - qdrant_data: - -networks: - appks-net: - driver: bridge diff --git a/etcd/discovery.rs b/etcd/discovery.rs index b7a3a87..98c3703 100644 --- a/etcd/discovery.rs +++ b/etcd/discovery.rs @@ -22,6 +22,7 @@ impl EtcdRegistry { self.load_initial("mail").await?; self.spawn_watch("mail"); } + Ok(()) } diff --git a/etcd/mod.rs b/etcd/mod.rs index b8ef8a8..2deca0a 100644 --- a/etcd/mod.rs +++ b/etcd/mod.rs @@ -116,6 +116,32 @@ impl EtcdRegistry { ids.sort(); ids } + + /// Read config from etcd. Priority: etcd > env > default. + /// This is async but can be called from sync context via block_on. + pub async fn get_config(&self, key: &str, default: &str) -> String { + let etcd_key = format!("{}config/{}", self.inner.key_prefix, key); + let mut client = self.inner.client.lock().await; + if let Ok(resp) = client.get(etcd_key.as_str(), None).await { + if let Some(kv) = resp.kvs().first() { + if let Ok(v) = kv.value_str() { + if !v.is_empty() { + tracing::info!(key, value = v, "config from etcd"); + return v.to_string(); + } + } + } + } + drop(client); + // Fall back to env + if let Ok(v) = std::env::var(key) { + if !v.is_empty() { + return v; + } + } + default.to_string() + } + } /// Derive a deterministic UUID from a gitks storage_name. diff --git a/etcd/register.rs b/etcd/register.rs index 4f39692..a1d3bc8 100644 --- a/etcd/register.rs +++ b/etcd/register.rs @@ -2,12 +2,9 @@ use std::collections::HashMap; use std::sync::atomic::Ordering; use etcd_client::PutOptions; -use tokio_stream::StreamExt; - use crate::error::{AppError, AppResult}; use super::EtcdRegistry; -use super::EtcdRegistryInner; use super::types::ServiceInstance; impl EtcdRegistry { @@ -58,74 +55,30 @@ impl EtcdRegistry { Ok(()) } - fn spawn_keep_alive(&self, lease_id: i64, key: String) { + fn spawn_keep_alive(&self, lease_id: i64, _key: String) { let inner = self.inner.clone(); - let interval = self.inner.config.etcd_keep_alive_interval().unwrap_or(10); tokio::spawn(async move { + let (mut keeper, mut stream) = { + let mut client = inner.client.lock().await; + match client.lease_keep_alive(lease_id).await { + Ok(pair) => pair, + Err(e) => { + tracing::error!(lease_id, error = %e, "failed to start lease keepalive"); + return; + } + } + }; + + let interval_secs = inner.config.etcd_keep_alive_interval().unwrap_or(10); + let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); loop { - Self::run_keep_alive_stream(&inner, lease_id).await; - tokio::time::sleep(std::time::Duration::from_secs(interval)).await; - Self::renew_lease_and_reregister(&inner, lease_id, &key).await; + interval.tick().await; + if let Err(e) = keeper.keep_alive().await { + tracing::warn!(lease_id, error = %e, "lease keepalive failed"); + } + let _ = stream.message().await; } }); } } - -impl EtcdRegistry { - async fn run_keep_alive_stream( - inner: &std::sync::Arc, - lease_id: i64, - ) { - let result = { - let mut client = inner.client.lock().await; - client.lease_keep_alive(lease_id).await - }; - - match result { - Ok((_keeper, mut stream)) => { - while let Some(resp) = stream.next().await { - if let Err(e) = resp { - tracing::warn!(lease_id = lease_id, error = %e, "keep-alive stream error"); - break; - } - } - } - Err(e) => { - tracing::warn!(lease_id = lease_id, error = %e, "keep-alive failed"); - } - } - } - - async fn renew_lease_and_reregister( - inner: &std::sync::Arc, - old_lease_id: i64, - key: &str, - ) { - let re_grant = { - let mut client = inner.client.lock().await; - client - .lease_grant(inner.config.etcd_lease_ttl().unwrap_or(15) as i64, None) - .await - }; - - let Ok(current) = re_grant else { - return; - }; - - let new_lease = current.id(); - inner.lease_id.store(new_lease, Ordering::SeqCst); - - let instance = ServiceInstance { - addr: inner.config.rpc_self_listen_addr().unwrap_or_default(), - metadata: HashMap::new(), - }; - - if let Ok(value) = serde_json::to_string(&instance) { - let mut client = inner.client.lock().await; - let opts = PutOptions::new().with_lease(new_lease); - let _ = client.put(key, value, Some(opts)).await; - } - tracing::info!(old = old_lease_id, new = new_lease, "etcd lease renewed"); - } -} diff --git a/grpc/mod.rs b/grpc/mod.rs index 3ac1e0c..61e7131 100644 --- a/grpc/mod.rs +++ b/grpc/mod.rs @@ -21,6 +21,8 @@ use crate::pb::im::member_service_server::MemberServiceServer; use crate::pb::im::permission_service_server::PermissionServiceServer; use crate::pb::im::stage_service_server::StageServiceServer; use crate::pb::im::voice_service_server::VoiceServiceServer; +use tonic_health::ServingStatus; + use crate::service::AppService; pub async fn start_grpc_server( @@ -34,9 +36,15 @@ pub async fn start_grpc_server( let cs = channel_settings::ChannelSettingsServices::new(service); + let (health_reporter, health_service) = tonic_health::server::health_reporter(); + health_reporter + .set_service_status("", ServingStatus::Serving) + .await; + tracing::info!(%addr, "gRPC server listening"); tonic::transport::Server::builder() + .add_service(health_service) .add_service(TokenServiceServer::new(token_svc)) .add_service(ChannelServiceServer::new(channel_svc)) .add_service(MemberServiceServer::new(member_svc)) diff --git a/main.rs b/main.rs index 55ab0b2..bef7268 100644 --- a/main.rs +++ b/main.rs @@ -39,11 +39,9 @@ async fn main() -> AppResult<()> { let registry = Arc::new(EtcdRegistry::connect(&config).await?); registry.start_discovery().await?; - if config.get_env_or("APP_ETCD_REGISTER_SELF", false)? { - registry - .register_self(&config.rpc_self_service_name()?) - .await?; - } + registry + .register_self(&config.rpc_self_service_name()?) + .await?; let nats = Arc::new(NatsQueue::connect(&config).await?);