use etcd_client::{Client, PutOptions}; use std::sync::Arc; use tokio::sync::Mutex; /// etcd-backed config reader. Priority: etcd > env var > default. pub struct EtcdConfig { client: Arc>, prefix: String, } impl EtcdConfig { pub async fn connect(endpoints: Vec, prefix: &str) -> Result { let client = Client::connect(endpoints, None) .await .map_err(|e| format!("etcd connect: {e}"))?; Ok(Self { client: Arc::new(Mutex::new(client)), prefix: prefix.to_string() }) } /// Get config value. Checks etcd first, then env var, then default. pub async fn get(&self, key: &str, default: &str) -> String { tracing::info!(key, "etcd get config"); // 1. Try etcd let etcd_key = format!("{}config/{}", self.prefix, key); if let Ok(mut client) = self.client.try_lock() { if let Ok(resp) = client.get(etcd_key.as_str(), None).await { if let Some(kv) = resp.kvs().first() { if let Ok(v) = kv.value_str() { if !v.is_empty() { tracing::info!(key, value = v, "config from etcd"); return v.to_string(); } } } } } // 2. Try env var if let Ok(v) = std::env::var(key) { if !v.is_empty() { tracing::info!(key, value = %v, "config from env"); return v; } } // 3. Default tracing::info!(key, value = %default, "config default"); default.to_string() } /// Get and parse config value. pub async fn get_parsed(&self, key: &str, default: T) -> T where T::Err: std::fmt::Display, T: std::fmt::Display, { let default_str = default.to_string(); let s = self.get(key, &default_str).await; s.parse().unwrap_or(default) } /// Set config value in etcd for other services to read. pub async fn set(&self, key: &str, value: &str) -> Result<(), String> { let etcd_key = format!("{}config/{}", self.prefix, key); let mut client = self.client.lock().await; client .put(etcd_key, value, None) .await .map_err(|e| format!("etcd put: {e}"))?; Ok(()) } /// Get the underlying etcd client for use by ServiceRegistry. pub fn client(&self) -> Arc> { self.client.clone() } /// Get the etcd key prefix. pub fn prefix(&self) -> &str { &self.prefix } } /// Register this service instance in etcd with a lease. pub struct ServiceRegistry { client: Arc>, prefix: String, } impl ServiceRegistry { pub fn new(client: Arc>, prefix: &str) -> Self { Self { client, prefix: prefix.to_string() } } /// Register this service under /{prefix}/services/{service_name}/{instance_id} pub async fn register(&self, service_name: &str, addr: &str) -> Result<(), String> { let instance_id = uuid::Uuid::now_v7().to_string(); let addr = addr.to_string(); let key = format!("{}services/{}/{}", self.prefix, service_name, instance_id); let instance = serde_json::json!({ "addr": &addr, "port": 0, "version": env!("CARGO_PKG_VERSION"), }); let value = serde_json::to_string(&instance).map_err(|e| format!("json: {e}"))?; let lease = { let mut client = self.client.lock().await; client .lease_grant(15, None) .await .map_err(|e| format!("lease: {e}"))? }; { let mut client = self.client.lock().await; let opts = PutOptions::new().with_lease(lease.id()); client .put(key.clone(), value, Some(opts)) .await .map_err(|e| format!("put: {e}"))?; } tracing::info!(service = service_name, instance = %instance_id, addr, "registered in etcd"); // Spawn keep-alive let c = self.client.clone(); tokio::spawn(async move { loop { let result = { let mut client = c.lock().await; client.lease_keep_alive(lease.id()).await }; match result { Ok((_keeper, mut stream)) => { use tokio_stream::StreamExt; while stream.next().await.is_some() {} } Err(e) => tracing::warn!(lease_id = lease.id(), error = %e, "keepalive failed"), } tokio::time::sleep(std::time::Duration::from_secs(5)).await; // Re-grant and re-register let new_lease = { let mut client = c.lock().await; client.lease_grant(15, None).await }; if let Ok(lease_resp) = new_lease { let new_id = lease_resp.id(); let instance = serde_json::json!({ "addr": addr, "port": 0, "version": env!("CARGO_PKG_VERSION"), }); if let Ok(v) = serde_json::to_string(&instance) { let mut client = c.lock().await; let opts = PutOptions::new().with_lease(new_id); let _ = client.put(key.clone(), v, Some(opts)).await; } } } }); Ok(()) } }