//! etcd-based peer discovery for ractor_cluster. //! //! Responsibilities: //! - Connect to etcd and create a Lease (TTL-based health check) //! - Register this node under `/gitks/nodes/{storage_name}` //! - Discover existing peers via prefix GET //! - Watch for peer join/leave events and invoke callbacks use std::sync::Arc; use tokio::sync::Mutex; use etcd_client::{Client, ConnectOptions, EventType, GetOptions, PutOptions, WatchOptions}; use super::types::PeerInfo; /// Key prefix used for all gitks entries in etcd. const KEY_PREFIX: &str = "/gitks/nodes/"; /// Wraps an etcd client with lease-based registration and peer discovery. pub struct EtcdRegistry { client: Mutex, lease_id: i64, storage_name: String, } impl EtcdRegistry { /// Connect to etcd, create a lease, and register this node. /// /// Returns `None` if the connection fails (caller should fall back to standalone mode). pub async fn register( endpoints: Vec, info: &PeerInfo, ttl_secs: i64, connect_timeout_ms: u64, ) -> Result> { let connect_opts = ConnectOptions::new() .with_connect_timeout(std::time::Duration::from_millis(connect_timeout_ms)) .with_keep_alive( std::time::Duration::from_secs(5), std::time::Duration::from_secs(3), ) .with_keep_alive_while_idle(true); let mut client = Client::connect(endpoints.clone(), Some(connect_opts)).await?; tracing::info!(endpoints = ?endpoints, "connected to etcd"); // Create lease let lease_resp = client.lease_grant(ttl_secs, None).await?; let lease_id = lease_resp.id(); tracing::info!(lease_id, ttl = ttl_secs, "etcd lease granted"); // Register node info under the lease let key = format!("{KEY_PREFIX}{}", info.storage_name); let value = serde_json::to_string(info)?; client .put(key, value, Some(PutOptions::new().with_lease(lease_id))) .await?; tracing::info!( storage_name = %info.storage_name, cluster_addr = %info.cluster_addr, "registered in etcd" ); Ok(Self { client: Mutex::new(client), lease_id, storage_name: info.storage_name.clone(), }) } /// Start the lease keepalive loop in a background task. /// /// The loop sends periodic heartbeats to etcd to prevent the lease from expiring. /// If keepalive fails, it logs a warning but does not panic — the node will /// eventually be removed from etcd when the lease expires. pub fn start_keepalive(self: &Arc) -> tokio::task::JoinHandle<()> { let this = Arc::clone(self); tokio::spawn(async move { let lease_id = this.lease_id; let (mut keeper, mut stream) = { let mut client = this.client.lock().await; match client.lease_keep_alive(lease_id).await { Ok(pair) => pair, Err(e) => { tracing::error!(lease_id, error = %e, "failed to start lease keepalive"); return; } } }; let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); loop { interval.tick().await; if let Err(e) = keeper.keep_alive().await { tracing::warn!(lease_id, error = %e, "etcd lease keepalive failed"); // Don't break — let the lease expire naturally if we can't recover } // Drain keepalive responses let _ = stream.message().await; } }) } /// Discover all currently registered peers (excluding this node). pub async fn discover_peers( &self, ) -> Result, Box> { let mut client = self.client.lock().await; let resp = client .get(KEY_PREFIX, Some(GetOptions::new().with_prefix())) .await?; let mut peers = Vec::new(); for kv in resp.kvs() { match serde_json::from_slice::(kv.value()) { Ok(info) if info.storage_name != self.storage_name => { peers.push(info); } Ok(_) => {} // skip self Err(e) => { tracing::warn!( key = %String::from_utf8_lossy(kv.key()), error = %e, "failed to parse peer info from etcd" ); } } } Ok(peers) } /// Start a long-running watch loop that monitors peer join/leave events. /// /// Callbacks are invoked synchronously within the watch task; keep them fast /// (prefer sending messages to actors rather than doing blocking work). pub fn start_watch( self: &Arc, on_peer_joined: impl Fn(PeerInfo) + Send + Sync + 'static, on_peer_left: impl Fn(String) + Send + Sync + 'static, ) -> tokio::task::JoinHandle<()> { let this = Arc::clone(self); let my_name = self.storage_name.clone(); tokio::spawn(async move { let on_joined = Arc::new(on_peer_joined); let on_left = Arc::new(on_peer_left); loop { // Create a fresh watch client each iteration (after reconnect) let mut watch_stream = { let mut client = this.client.lock().await; match client .watch(KEY_PREFIX, Some(WatchOptions::new().with_prefix())) .await { Ok(stream) => stream, Err(e) => { tracing::error!(error = %e, "etcd watch failed, retrying in 3s"); tokio::time::sleep(std::time::Duration::from_secs(3)).await; continue; } } }; tracing::info!("etcd watch loop started"); loop { match watch_stream.message().await { Ok(Some(resp)) => { for event in resp.events() { match event.event_type() { EventType::Put => { if let Some(kv) = event.kv() && let Ok(info) = serde_json::from_slice::(kv.value()) && info.storage_name != my_name { tracing::info!( peer = %info.storage_name, cluster_addr = %info.cluster_addr, "peer joined via etcd watch" ); on_joined(info); } } EventType::Delete => { if let Some(kv) = event.kv() { let key = String::from_utf8_lossy(kv.key()); let name = key .strip_prefix(KEY_PREFIX) .unwrap_or(&key) .to_string(); if name != my_name { tracing::warn!( peer = %name, "peer left (etcd lease expired or key deleted)" ); on_left(name); } } } } } } Ok(None) => { tracing::warn!("etcd watch stream ended"); break; } Err(e) => { tracing::error!(error = %e, "etcd watch stream error"); break; } } } // Reconnect after a delay tracing::info!("etcd watch loop restarting in 3s"); tokio::time::sleep(std::time::Duration::from_secs(3)).await; } }) } /// Check if the lease is still alive (for external health monitoring). pub async fn is_lease_alive(&self) -> bool { let mut client = self.client.lock().await; match client.lease_time_to_live(self.lease_id, None).await { Ok(resp) => resp.ttl() > 0, Err(_) => false, } } }