//! Cluster discovery: etcd-driven ractor_cluster node discovery. //! //! Architecture: //! 1. Start a `ractor_cluster::NodeServer` (TCP listener for actor remoting) //! 2. Connect to etcd and register this node //! 3. Discover existing peers → `client_connect()` to each //! 4. Watch etcd for future peer join/leave → connect/disconnect dynamically //! //! Once ractor_cluster TCP connections are established, the existing //! `pg::get_members()` / `ractor::call_t!()` APIs automatically work //! cross-network — no changes needed in actor/handler.rs or server/mod.rs. pub mod discovery; pub mod types; pub use discovery::EtcdRegistry; pub use types::PeerInfo; use std::sync::Arc; use ractor::ActorRef; use ractor_cluster::node::NodeConnectionMode; use ractor_cluster::{NodeServer, NodeServerMessage, client_connect}; use crate::error::{GitError, GitResult}; /// Configuration for the cluster subsystem. #[derive(Debug, Clone)] pub struct ClusterConfig { /// etcd endpoints (e.g. ["http://etcd1:2379", "http://etcd2:2379"]) pub etcd_endpoints: Vec, /// Logical name for this storage node pub storage_name: String, /// gRPC address advertised to clients pub grpc_addr: String, /// TCP port for ractor_cluster NodeServer pub cluster_port: u16, /// Shared authentication cookie for ractor_cluster pub cookie: String, /// etcd lease TTL in seconds pub lease_ttl_secs: i64, /// etcd connection timeout in milliseconds pub connect_timeout_ms: u64, /// Hostname used in the ractor_cluster node name (`name@hostname`). /// Also used by remote nodes to connect back via `{cluster_hostname}:{cluster_port}`. /// In K8s/Docker, this should be a resolvable address (Pod IP, service DNS, etc.) pub cluster_hostname: String, } /// The running cluster manager. Holds references to the NodeServer and etcd registry. /// Dropping this will stop the background tasks. pub struct ClusterManager { /// The ractor_cluster NodeServer actor pub node_server: ActorRef, /// The etcd registry (for health checks, etc.) pub registry: Arc, /// Handles for background tasks (keepalive + watch) _keepalive_handle: tokio::task::JoinHandle<()>, _watch_handle: tokio::task::JoinHandle<()>, } impl ClusterManager { /// Start the full cluster subsystem: /// 1. Spawn NodeServer (TCP listener) /// 2. Connect to etcd + register /// 3. Discover peers → client_connect /// 4. Start keepalive + watch loops /// /// Returns `Err` if etcd is unreachable (caller should fall back to standalone). pub async fn start(config: ClusterConfig) -> GitResult { // ── Step 1: Start NodeServer ── let node_server = spawn_node_server(&config).await?; tracing::info!( port = config.cluster_port, hostname = %config.cluster_hostname, "NodeServer started" ); // ── Step 2: Connect to etcd and register ── let cluster_addr = format!("{}:{}", config.cluster_hostname, config.cluster_port); let peer_info = PeerInfo { storage_name: config.storage_name.clone(), cluster_addr: cluster_addr.clone(), grpc_addr: config.grpc_addr.clone(), version: env!("CARGO_PKG_VERSION").to_string(), }; let registry = Arc::new( EtcdRegistry::register( config.etcd_endpoints.clone(), &peer_info, config.lease_ttl_secs, config.connect_timeout_ms, ) .await .map_err(|e| GitError::Internal(format!("etcd registration failed: {e}")))?, ); // ── Step 3: Discover existing peers and connect ── let peers = registry .discover_peers() .await .map_err(|e| GitError::Internal(format!("peer discovery failed: {e}")))?; for peer in &peers { connect_to_peer(&node_server, peer, &config.storage_name).await; } // ── Step 4: Start background tasks ── let keepalive_handle = registry.start_keepalive(); let ns_for_watch = node_server.clone(); let my_name_for_watch = config.storage_name.clone(); let watch_handle = registry.start_watch( move |peer| { let ns = ns_for_watch.clone(); let my_name = my_name_for_watch.clone(); tokio::spawn(async move { connect_to_peer(&ns, &peer, &my_name).await; }); }, move |name| { tracing::info!( peer = %name, "peer left etcd registry (ractor_cluster will cleanup TCP session)" ); // ractor_cluster automatically: // 1. Detects TCP disconnection // 2. Stops the NodeSession actor // 3. Stops all RemoteActors for that session // 4. Removes them from Process Groups // No manual cleanup needed. }, ); tracing::info!( storage_name = %config.storage_name, peers_found = peers.len(), "cluster manager started" ); Ok(Self { node_server, registry, _keepalive_handle: keepalive_handle, _watch_handle: watch_handle, }) } } /// Spawn the ractor_cluster NodeServer actor (TCP listener for inter-node communication). async fn spawn_node_server(config: &ClusterConfig) -> GitResult> { let server = NodeServer::new( config.cluster_port, config.cookie.clone(), config.storage_name.clone(), config.cluster_hostname.clone(), None, // no encryption (internal network) Some(NodeConnectionMode::Transitive), ); let (actor_ref, _handle) = ractor::Actor::spawn( Some(format!("node_server_{}", config.storage_name)), server, (), ) .await .map_err(|e| GitError::Internal(format!("failed to spawn NodeServer: {e}")))?; Ok(actor_ref) } /// Establish a ractor_cluster TCP connection to a remote peer. /// /// Uses ordering optimization: only the node with the lexicographically /// smaller `storage_name` initiates the connection. The other side will /// accept the incoming connection. This prevents duplicate connections. async fn connect_to_peer( node_server: &ActorRef, peer: &PeerInfo, my_name: &str, ) { // Ordering optimization: only smaller-named node connects if my_name >= peer.storage_name.as_str() { tracing::debug!( peer = %peer.storage_name, "skipping connect (peer has lower/equal name, they connect to us)" ); return; } tracing::info!( peer = %peer.storage_name, cluster_addr = %peer.cluster_addr, "connecting to peer via ractor_cluster" ); match client_connect(node_server, peer.cluster_addr.as_str()).await { Ok(()) => { tracing::info!( peer = %peer.storage_name, "ractor_cluster connection initiated" ); } Err(e) => { tracing::warn!( peer = %peer.storage_name, cluster_addr = %peer.cluster_addr, error = %e, "failed to connect to peer (will retry on next watch event)" ); } } }