use std::collections::HashMap; use std::sync::atomic::Ordering; use etcd_client::PutOptions; use tokio_stream::StreamExt; use crate::error::{AppError, AppResult}; use super::EtcdRegistry; use super::EtcdRegistryInner; use super::types::ServiceInstance; impl EtcdRegistry { pub async fn register_self(&self, service_name: &str) -> AppResult<()> { let ttl = self.inner.config.etcd_lease_ttl()?; let listen_addr = self.inner.config.rpc_self_listen_addr()?; let instance_id = uuid::Uuid::now_v7().to_string(); let key = format!( "{}services/{service_name}/{instance_id}", self.inner.key_prefix ); let instance = ServiceInstance { addr: listen_addr.clone(), metadata: HashMap::new(), }; let value = serde_json::to_string(&instance)?; let lease_resp = { let mut client = self.inner.client.lock().await; client .lease_grant(ttl as i64, None) .await .map_err(|e| AppError::Config(format!("etcd lease_grant failed: {e}")))? }; let lease_id = lease_resp.id(); self.inner.lease_id.store(lease_id, Ordering::SeqCst); { let mut client = self.inner.client.lock().await; let opts = PutOptions::new().with_lease(lease_id); client .put(key.clone(), value, Some(opts)) .await .map_err(|e| AppError::Config(format!("etcd put failed: {e}")))?; } tracing::info!( service = service_name, addr = listen_addr.as_str(), lease_id = lease_id, "registered self in etcd" ); self.spawn_keep_alive(lease_id, key); Ok(()) } fn spawn_keep_alive(&self, lease_id: i64, key: String) { let inner = self.inner.clone(); let interval = self.inner.config.etcd_keep_alive_interval().unwrap_or(10); tokio::spawn(async move { loop { Self::run_keep_alive_stream(&inner, lease_id).await; tokio::time::sleep(std::time::Duration::from_secs(interval)).await; Self::renew_lease_and_reregister(&inner, lease_id, &key).await; } }); } } impl EtcdRegistry { async fn run_keep_alive_stream( inner: &std::sync::Arc, lease_id: i64, ) { let result = { let mut client = inner.client.lock().await; client.lease_keep_alive(lease_id).await }; match result { Ok((_keeper, mut stream)) => { while let Some(resp) = stream.next().await { if let Err(e) = resp { tracing::warn!(lease_id = lease_id, error = %e, "keep-alive stream error"); break; } } } Err(e) => { tracing::warn!(lease_id = lease_id, error = %e, "keep-alive failed"); } } } async fn renew_lease_and_reregister( inner: &std::sync::Arc, old_lease_id: i64, key: &str, ) { let re_grant = { let mut client = inner.client.lock().await; client .lease_grant(inner.config.etcd_lease_ttl().unwrap_or(15) as i64, None) .await }; let Ok(current) = re_grant else { return; }; let new_lease = current.id(); inner.lease_id.store(new_lease, Ordering::SeqCst); let instance = ServiceInstance { addr: inner.config.rpc_self_listen_addr().unwrap_or_default(), metadata: HashMap::new(), }; if let Ok(value) = serde_json::to_string(&instance) { let mut client = inner.client.lock().await; let opts = PutOptions::new().with_lease(new_lease); let _ = client.put(key, value, Some(opts)).await; } tracing::info!(old = old_lease_id, new = new_lease, "etcd lease renewed"); } }