use etcd_client::{GetOptions, WatchOptions}; use tokio_stream::StreamExt; use uuid::Uuid; use crate::error::{AppError, AppResult}; use crate::pb::{EmailClient, RepoClient}; use super::types::{GitksPeerInfo, ServiceInstance}; use super::{EtcdRegistry, EtcdRegistryInner, storage_name_to_uuid}; /// etcd prefix where gitks nodes register themselves (gitks::cluster::ClusterManager). const GITKS_NODES_PREFIX: &str = "/gitks/nodes/"; impl EtcdRegistry { pub async fn start_discovery(&self) -> AppResult<()> { // Discover gitks nodes from gitks's own etcd prefix. self.load_gitks_nodes().await?; self.spawn_gitks_watch(); // Discover mail services from appks's service prefix. if self.email_client.is_none() { self.load_initial("mail").await?; self.spawn_watch("mail"); } Ok(()) } // Section: gitks node discovery (from /gitks/nodes/) async fn load_gitks_nodes(&self) -> AppResult<()> { let resp = { let mut client = self.inner.client.lock().await; client .get(GITKS_NODES_PREFIX, Some(GetOptions::new().with_prefix())) .await .map_err(|e| { AppError::Config(format!("etcd get {GITKS_NODES_PREFIX} failed: {e}")) })? }; for kv in resp.kvs() { let key = kv.key_str().unwrap_or_default(); let value = kv.value_str().unwrap_or_default(); if let Ok(peer) = serde_json::from_str::(value) { Self::upsert_gitks_node(&self.inner, key, &peer); } else { tracing::warn!(key = key, "failed to parse gitks peer info from etcd"); } } tracing::info!( prefix = GITKS_NODES_PREFIX, count = self.inner.git_nodes.len(), "gitks node discovery complete" ); Ok(()) } fn spawn_gitks_watch(&self) { let inner = self.inner.clone(); tokio::spawn(async move { loop { match Self::gitks_watch_loop(&inner).await { Ok(()) => break, Err(e) => { tracing::warn!(error = %e, "gitks etcd watch disconnected, retrying in 3s"); tokio::time::sleep(std::time::Duration::from_secs(3)).await; } } } }); } async fn gitks_watch_loop(inner: &EtcdRegistryInner) -> AppResult<()> { let mut stream = { let mut client = inner.client.lock().await; client .watch(GITKS_NODES_PREFIX, Some(WatchOptions::new().with_prefix())) .await .map_err(|e| { AppError::Config(format!("etcd watch {GITKS_NODES_PREFIX} failed: {e}")) })? }; while let Some(resp) = stream.next().await { let resp = resp.map_err(|e| AppError::Config(format!("gitks watch stream error: {e}")))?; for event in resp.events() { let Some(kv) = event.kv() else { continue }; let key = kv.key_str().unwrap_or_default(); match event.event_type() { etcd_client::EventType::Put => { let value = kv.value_str().unwrap_or_default(); if let Ok(peer) = serde_json::from_str::(value) { Self::upsert_gitks_node(inner, key, &peer); tracing::info!( storage_name = %peer.storage_name, grpc_addr = %peer.grpc_addr, "gitks node upserted" ); } } etcd_client::EventType::Delete => { let storage_name = key.strip_prefix(GITKS_NODES_PREFIX).unwrap_or(&key); let node_id = storage_name_to_uuid(storage_name); inner.git_nodes.remove(&node_id); tracing::info!(storage_name = storage_name, "gitks node removed"); } } } } Ok(()) } fn upsert_gitks_node(inner: &EtcdRegistryInner, key: &str, peer: &GitksPeerInfo) { let node_id = storage_name_to_uuid(&peer.storage_name); if peer.grpc_addr.is_empty() { tracing::warn!( storage_name = %peer.storage_name, key = key, "gitks peer has empty grpc_addr, skipping" ); return; } match RepoClient::lazy_connect(&peer.grpc_addr) { Ok(client) => { inner.git_nodes.insert(node_id, client); tracing::debug!( storage_name = %peer.storage_name, node_id = %node_id, grpc_addr = %peer.grpc_addr, "gitks node connected" ); } Err(e) => { tracing::error!( storage_name = %peer.storage_name, grpc_addr = %peer.grpc_addr, error = %e, "gitks node connect failed" ); } } } // Section: mail service discovery (from appks's own etcd prefix) async fn load_initial(&self, service: &str) -> AppResult<()> { let prefix = self.service_prefix(service); let resp = { let mut client = self.inner.client.lock().await; client .get(prefix.as_str(), Some(GetOptions::new().with_prefix())) .await .map_err(|e| AppError::Config(format!("etcd get {prefix} failed: {e}")))? }; for kv in resp.kvs() { let key = kv.key_str().unwrap_or_default(); let value = kv.value_str().unwrap_or_default(); if let Ok(instance) = serde_json::from_str::(value) { Self::upsert_instance(&self.inner, service, key, &instance); } } tracing::info!( service = service, prefix = prefix.as_str(), "etcd mail discovery complete" ); Ok(()) } fn spawn_watch(&self, service: &str) { let prefix = self.service_prefix(service); let inner = self.inner.clone(); let service = service.to_string(); tokio::spawn(async move { loop { match Self::watch_loop(&inner, &prefix, &service).await { Ok(()) => break, Err(e) => { tracing::warn!( service = service.as_str(), error = %e, "etcd watch disconnected, retrying in 3s" ); tokio::time::sleep(std::time::Duration::from_secs(3)).await; } } } }); } async fn watch_loop(inner: &EtcdRegistryInner, prefix: &str, service: &str) -> AppResult<()> { let mut stream = { let mut client = inner.client.lock().await; client .watch(prefix, Some(WatchOptions::new().with_prefix())) .await .map_err(|e| AppError::Config(format!("etcd watch {prefix} failed: {e}")))? }; while let Some(resp) = stream.next().await { let resp = resp.map_err(|e| AppError::Config(format!("etcd watch stream error: {e}")))?; for event in resp.events() { let Some(kv) = event.kv() else { continue }; let key = kv.key_str().unwrap_or_default(); match event.event_type() { etcd_client::EventType::Put => { let value = kv.value_str().unwrap_or_default(); if let Ok(instance) = serde_json::from_str::(value) { Self::upsert_instance(inner, service, key, &instance); tracing::info!(service = service, key = key, "mail service upserted"); } } etcd_client::EventType::Delete => { Self::remove_instance(inner, service, key); tracing::info!(service = service, key = key, "mail service removed"); } } } } Ok(()) } pub(crate) fn service_prefix(&self, service: &str) -> String { format!("{}services/{service}/", self.inner.key_prefix) } fn extract_id_from_key(key: &str) -> Option { key.rsplit('/').next()?.parse().ok() } fn upsert_instance( inner: &EtcdRegistryInner, service: &str, key: &str, instance: &ServiceInstance, ) { let Some(node_id) = Self::extract_id_from_key(key) else { tracing::warn!(key = key, "etcd key has no valid UUID suffix"); return; }; let addr = instance.addr.clone(); match service { "mail" => match EmailClient::lazy_connect(&addr) { Ok(client) => { inner.mail_nodes.insert(node_id, client); } Err(e) => { tracing::error!( key = key, addr = addr.as_str(), error = %e, "mail client connect failed" ); } }, _ => {} } } fn remove_instance(inner: &EtcdRegistryInner, service: &str, key: &str) { let Some(node_id) = Self::extract_id_from_key(key) else { return; }; match service { "mail" => { inner.mail_nodes.remove(&node_id); } _ => {} } } }