use etcd_client::{GetOptions, WatchOptions}; use tokio_stream::StreamExt; use uuid::Uuid; use crate::error::{AppError, AppResult}; use crate::pb::{EmailClient, RepoClient}; use super::types::ServiceInstance; use super::{EtcdRegistry, EtcdRegistryInner}; impl EtcdRegistry { pub async fn start_discovery(&self) -> AppResult<()> { self.load_initial("git").await?; self.load_initial("mail").await?; self.spawn_watch("git"); self.spawn_watch("mail"); Ok(()) } async fn load_initial(&self, service: &str) -> AppResult<()> { let prefix = self.service_prefix(service); let resp = { let mut client = self.inner.client.lock().await; client .get(prefix.as_str(), Some(GetOptions::new().with_prefix())) .await .map_err(|e| AppError::Config(format!("etcd get {prefix} failed: {e}")))? }; for kv in resp.kvs() { let key = kv.key_str().unwrap_or_default(); let value = kv.value_str().unwrap_or_default(); if let Ok(instance) = serde_json::from_str::(value) { Self::upsert_instance(&self.inner, service, key, &instance); } } tracing::info!( service = service, prefix = prefix.as_str(), "etcd initial discovery complete" ); Ok(()) } fn spawn_watch(&self, service: &str) { let prefix = self.service_prefix(service); let inner = self.inner.clone(); let service = service.to_string(); tokio::spawn(async move { loop { match Self::watch_loop(&inner, &prefix, &service).await { Ok(()) => break, Err(e) => { tracing::warn!( service = service.as_str(), error = %e, "etcd watch disconnected, retrying in 3s" ); tokio::time::sleep(std::time::Duration::from_secs(3)).await; } } } }); } async fn watch_loop(inner: &EtcdRegistryInner, prefix: &str, service: &str) -> AppResult<()> { let (mut watcher, mut stream) = { let mut client = inner.client.lock().await; client .watch(prefix, Some(WatchOptions::new().with_prefix())) .await .map_err(|e| AppError::Config(format!("etcd watch {prefix} failed: {e}")))? }; let _keep = &mut watcher; while let Some(resp) = stream.next().await { let resp = resp.map_err(|e| AppError::Config(format!("etcd watch stream error: {e}")))?; for event in resp.events() { let Some(kv) = event.kv() else { continue }; let key = kv.key_str().unwrap_or_default(); match event.event_type() { etcd_client::EventType::Put => { let value = kv.value_str().unwrap_or_default(); if let Ok(instance) = serde_json::from_str::(value) { Self::upsert_instance(inner, service, key, &instance); tracing::info!(service = service, key = key, "etcd service upserted"); } } etcd_client::EventType::Delete => { Self::remove_instance(inner, service, key); tracing::info!(service = service, key = key, "etcd service removed"); } } } } Ok(()) } pub(crate) fn service_prefix(&self, service: &str) -> String { format!("{}services/{service}/", self.inner.key_prefix) } fn extract_id_from_key(key: &str) -> Option { key.rsplit('/').next()?.parse().ok() } fn upsert_instance( inner: &EtcdRegistryInner, service: &str, key: &str, instance: &ServiceInstance, ) { let Some(node_id) = Self::extract_id_from_key(key) else { tracing::warn!(key = key, "etcd key has no valid UUID suffix"); return; }; let addr = instance.addr.clone(); match service { "git" => match RepoClient::lazy_connect(&addr) { Ok(client) => { inner.git_nodes.insert(node_id, client); } Err(e) => { tracing::error!(key = key, addr = addr.as_str(), error = %e, "git client connect failed"); } }, "mail" => match EmailClient::lazy_connect(&addr) { Ok(client) => { inner.mail_nodes.insert(node_id, client); } Err(e) => { tracing::error!(key = key, addr = addr.as_str(), error = %e, "mail client connect failed"); } }, _ => {} } } fn remove_instance(inner: &EtcdRegistryInner, service: &str, key: &str) { let Some(node_id) = Self::extract_id_from_key(key) else { return; }; match service { "git" => { inner.git_nodes.remove(&node_id); } "mail" => { inner.mail_nodes.remove(&node_id); } _ => {} } } }