66afd932ed
- Add FindCommit, ListCommitsByOid, CommitIsAncestor RPCs to CommitService - Add CheckObjectsExist, CommitsByMessage, GetCommitStats RPCs to CommitService - Add LastCommitForPath, CountCommits, CountDivergingCommits RPCs to CommitService - Add RawDiff, RawPatch, FindChangedPaths RPCs to DiffService - Add FindMergeBase, WriteRef, SearchFilesByContent RPCs to RepositoryService - Add SearchFilesByName, ObjectsSize, RepositorySize RPCs to RepositoryService - Add FindLicense, OptimizeRepository, GetRawChanges RPCs to RepositoryService - Add FetchRemote, CreateRepositoryFromURL RPCs to RepositoryService - Implement server handlers for all new RPC methods - Add new modules for commit counting, finding, and querying features - Add new modules for diff changed paths and raw operations - Add new modules for refs and remote operations - Remove unnecessary comments from various source files - Update proto definitions with new message types and service methods
211 lines
7.2 KiB
Rust
211 lines
7.2 KiB
Rust
//! Cluster discovery: etcd-driven ractor_cluster node discovery.
|
|
//!
|
|
//! Architecture:
|
|
//! 1. Start a `ractor_cluster::NodeServer` (TCP listener for actor remoting)
|
|
//! 2. Connect to etcd and register this node
|
|
//! 3. Discover existing peers → `client_connect()` to each
|
|
//! 4. Watch etcd for future peer join/leave → connect/disconnect dynamically
|
|
//!
|
|
//! Once ractor_cluster TCP connections are established, the existing
|
|
//! `pg::get_members()` / `ractor::call_t!()` APIs automatically work
|
|
//! cross-network — no changes needed in actor/handler.rs or server/mod.rs.
|
|
|
|
pub mod discovery;
|
|
pub mod types;
|
|
|
|
pub use discovery::EtcdRegistry;
|
|
pub use types::PeerInfo;
|
|
|
|
use std::sync::Arc;
|
|
|
|
use ractor::ActorRef;
|
|
use ractor_cluster::node::NodeConnectionMode;
|
|
use ractor_cluster::{NodeServer, NodeServerMessage, client_connect};
|
|
|
|
use crate::error::{GitError, GitResult};
|
|
|
|
/// Configuration for the cluster subsystem.
|
|
#[derive(Debug, Clone)]
|
|
pub struct ClusterConfig {
|
|
/// etcd endpoints (e.g. ["http://etcd1:2379", "http://etcd2:2379"])
|
|
pub etcd_endpoints: Vec<String>,
|
|
/// Logical name for this storage node
|
|
pub storage_name: String,
|
|
/// gRPC address advertised to clients
|
|
pub grpc_addr: String,
|
|
/// TCP port for ractor_cluster NodeServer
|
|
pub cluster_port: u16,
|
|
/// Shared authentication cookie for ractor_cluster
|
|
pub cookie: String,
|
|
/// etcd lease TTL in seconds
|
|
pub lease_ttl_secs: i64,
|
|
/// etcd connection timeout in milliseconds
|
|
pub connect_timeout_ms: u64,
|
|
/// Hostname used in the ractor_cluster node name (`name@hostname`).
|
|
/// Also used by remote nodes to connect back via `{cluster_hostname}:{cluster_port}`.
|
|
/// In K8s/Docker, this should be a resolvable address (Pod IP, service DNS, etc.)
|
|
pub cluster_hostname: String,
|
|
}
|
|
|
|
/// The running cluster manager. Holds references to the NodeServer and etcd registry.
|
|
/// Dropping this will stop the background tasks.
|
|
pub struct ClusterManager {
|
|
/// The ractor_cluster NodeServer actor
|
|
pub node_server: ActorRef<NodeServerMessage>,
|
|
/// The etcd registry (for health checks, etc.)
|
|
pub registry: Arc<EtcdRegistry>,
|
|
/// Handles for background tasks (keepalive + watch)
|
|
_keepalive_handle: tokio::task::JoinHandle<()>,
|
|
_watch_handle: tokio::task::JoinHandle<()>,
|
|
}
|
|
|
|
impl ClusterManager {
|
|
/// Start the full cluster subsystem:
|
|
/// 1. Spawn NodeServer (TCP listener)
|
|
/// 2. Connect to etcd + register
|
|
/// 3. Discover peers → client_connect
|
|
/// 4. Start keepalive + watch loops
|
|
///
|
|
/// Returns `Err` if etcd is unreachable (caller should fall back to standalone).
|
|
pub async fn start(config: ClusterConfig) -> GitResult<Self> {
|
|
let node_server = spawn_node_server(&config).await?;
|
|
tracing::info!(
|
|
port = config.cluster_port,
|
|
hostname = %config.cluster_hostname,
|
|
"NodeServer started"
|
|
);
|
|
|
|
let cluster_addr = format!("{}:{}", config.cluster_hostname, config.cluster_port);
|
|
let peer_info = PeerInfo {
|
|
storage_name: config.storage_name.clone(),
|
|
cluster_addr: cluster_addr.clone(),
|
|
grpc_addr: config.grpc_addr.clone(),
|
|
version: env!("CARGO_PKG_VERSION").to_string(),
|
|
};
|
|
|
|
let registry = Arc::new(
|
|
EtcdRegistry::register(
|
|
config.etcd_endpoints.clone(),
|
|
&peer_info,
|
|
config.lease_ttl_secs,
|
|
config.connect_timeout_ms,
|
|
)
|
|
.await
|
|
.map_err(|e| GitError::Internal(format!("etcd registration failed: {e}")))?,
|
|
);
|
|
|
|
let peers = registry
|
|
.discover_peers()
|
|
.await
|
|
.map_err(|e| GitError::Internal(format!("peer discovery failed: {e}")))?;
|
|
|
|
for peer in &peers {
|
|
connect_to_peer(&node_server, peer, &config.storage_name).await;
|
|
}
|
|
|
|
let keepalive_handle = registry.start_keepalive();
|
|
|
|
let ns_for_watch = node_server.clone();
|
|
let my_name_for_watch = config.storage_name.clone();
|
|
let watch_handle = registry.start_watch(
|
|
move |peer| {
|
|
let ns = ns_for_watch.clone();
|
|
let my_name = my_name_for_watch.clone();
|
|
tokio::spawn(async move {
|
|
connect_to_peer(&ns, &peer, &my_name).await;
|
|
});
|
|
},
|
|
move |name| {
|
|
tracing::info!(
|
|
peer = %name,
|
|
"peer left etcd registry (ractor_cluster will cleanup TCP session)"
|
|
);
|
|
// ractor_cluster automatically:
|
|
// 1. Detects TCP disconnection
|
|
// 2. Stops the NodeSession actor
|
|
// 3. Stops all RemoteActors for that session
|
|
// 4. Removes them from Process Groups
|
|
// No manual cleanup needed.
|
|
},
|
|
);
|
|
|
|
tracing::info!(
|
|
storage_name = %config.storage_name,
|
|
peers_found = peers.len(),
|
|
"cluster manager started"
|
|
);
|
|
|
|
Ok(Self {
|
|
node_server,
|
|
registry,
|
|
_keepalive_handle: keepalive_handle,
|
|
_watch_handle: watch_handle,
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Spawn the ractor_cluster NodeServer actor (TCP listener for inter-node communication).
|
|
async fn spawn_node_server(config: &ClusterConfig) -> GitResult<ActorRef<NodeServerMessage>> {
|
|
let server = NodeServer::new(
|
|
config.cluster_port,
|
|
config.cookie.clone(),
|
|
config.storage_name.clone(),
|
|
config.cluster_hostname.clone(),
|
|
None, // no encryption (internal network)
|
|
Some(NodeConnectionMode::Transitive),
|
|
);
|
|
|
|
let (actor_ref, _handle) = ractor::Actor::spawn(
|
|
Some(format!("node_server_{}", config.storage_name)),
|
|
server,
|
|
(),
|
|
)
|
|
.await
|
|
.map_err(|e| GitError::Internal(format!("failed to spawn NodeServer: {e}")))?;
|
|
|
|
Ok(actor_ref)
|
|
}
|
|
|
|
/// Establish a ractor_cluster TCP connection to a remote peer.
|
|
///
|
|
/// Uses ordering optimization: only the node with the lexicographically
|
|
/// smaller `storage_name` initiates the connection. The other side will
|
|
/// accept the incoming connection. This prevents duplicate connections.
|
|
async fn connect_to_peer(
|
|
node_server: &ActorRef<NodeServerMessage>,
|
|
peer: &PeerInfo,
|
|
my_name: &str,
|
|
) {
|
|
// Ordering optimization: only smaller-named node connects
|
|
if my_name >= peer.storage_name.as_str() {
|
|
tracing::debug!(
|
|
peer = %peer.storage_name,
|
|
"skipping connect (peer has lower/equal name, they connect to us)"
|
|
);
|
|
return;
|
|
}
|
|
|
|
tracing::info!(
|
|
peer = %peer.storage_name,
|
|
cluster_addr = %peer.cluster_addr,
|
|
"connecting to peer via ractor_cluster"
|
|
);
|
|
|
|
match client_connect(node_server, peer.cluster_addr.as_str()).await {
|
|
Ok(()) => {
|
|
tracing::info!(
|
|
peer = %peer.storage_name,
|
|
"ractor_cluster connection initiated"
|
|
);
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(
|
|
peer = %peer.storage_name,
|
|
cluster_addr = %peer.cluster_addr,
|
|
error = %e,
|
|
"failed to connect to peer (will retry on next watch event)"
|
|
);
|
|
}
|
|
}
|
|
}
|