diff --git a/actor/handler.rs b/actor/handler.rs index c18885d..9af2d39 100644 --- a/actor/handler.rs +++ b/actor/handler.rs @@ -1,13 +1,17 @@ use crate::actor::message::{ - ElectionRequest, ElectionResult, GitNodeMessage, NodeHealth, ROLE_PRIMARY, ROLE_REPLICA, - RefUpdateEvent, RoleChangedEvent, RouteDecision, + AppendEntriesRequest, AppendEntriesResponse, ElectionRequest, ElectionResult, GitNodeMessage, + NodeHealth, ReadIndexResponse, RefUpdateEvent, RoleChangedEvent, RouteDecision, + ROLE_PRIMARY, ROLE_REPLICA, RAFT_MSG_VERSION, }; +use crate::actor::raft_log::RaftLog; use crate::pb::RepositoryHeader; use crate::server::GitksService; use async_trait::async_trait; use ractor::pg; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; use std::collections::HashMap; +use std::path::PathBuf; +use std::time::{Duration, Instant}; #[derive(Clone)] pub struct GitNodeActor { @@ -24,6 +28,7 @@ impl GitNodeActor { } } +#[derive(Debug, Clone)] pub struct RepoEntry { pub role: String, pub last_commit: String, @@ -33,8 +38,13 @@ pub struct RepoEntry { pub struct GitNodeArgs { pub storage_name: String, pub grpc_addr: String, + /// Directory for Raft log persistence. + pub data_dir: PathBuf, } +/// Leader lease duration (10 seconds). +const LEADER_LEASE_DURATION: Duration = Duration::from_secs(10); + pub struct GitNodeState { storage_name: String, actor_name: String, @@ -45,6 +55,19 @@ pub struct GitNodeState { is_primary: bool, last_known_primary_grpc: String, voted_for: Option, + + // ── Raft consensus state ───────────────────────────────── + pub raft_log: RaftLog, + /// Leader-only: lease deadline. If expired, Leader stops accepting writes. + pub leader_lease_deadline: Option, + /// Leader-only: next index to send to each follower. + pub next_index: HashMap, + /// Leader-only: highest known replicated index for each follower. + pub match_index: HashMap, + /// The known leader's storage_name (for followers). + pub leader_id: Option, + /// The known leader's gRPC address (for followers). + pub leader_grpc_addr: Option, } #[async_trait] @@ -69,6 +92,18 @@ impl Actor for GitNodeActor { start_health_checker(myself.clone(), 1, 10); + // Initialize Raft log with disk persistence + let raft_data_dir = args.data_dir.join("raft"); + let raft_log = RaftLog::new(&raft_data_dir).map_err(|e| { + ActorProcessingErr::from(format!("failed to init raft log: {e}")) + })?; + tracing::info!( + storage_name = %args.storage_name, + entries = raft_log.len(), + last_index = raft_log.last_index(), + "raft log initialized" + ); + Ok(GitNodeState { storage_name: args.storage_name, actor_name, @@ -79,6 +114,12 @@ impl Actor for GitNodeActor { is_primary: true, // Will be refined at registration last_known_primary_grpc: args.grpc_addr.clone(), voted_for: None, + raft_log, + leader_lease_deadline: None, + next_index: HashMap::new(), + match_index: HashMap::new(), + leader_id: None, + leader_grpc_addr: None, }) } @@ -266,6 +307,8 @@ impl Actor for GitNodeActor { candidate_actor_name: state.actor_name.clone(), term: new_term, reason: "health_check_failure".to_string(), + last_log_index: state.raft_log.last_index(), + last_log_term: state.raft_log.last_term(), }; match ractor::call_t!(actor_ref, GitNodeMessage::ElectPrimary, 1000, request) { Ok(result) if result.accepted => { @@ -314,6 +357,22 @@ impl Actor for GitNodeActor { ); } } + + // ── Raft consensus messages ───────────────────────── + GitNodeMessage::AppendEntries(request, reply) => { + let response = handle_append_entries(&myself, state, &request); + let _ = reply.send(response); + } + + GitNodeMessage::ReadIndex(_request, reply) => { + let response = handle_read_index(state); + let _ = reply.send(response); + } + + GitNodeMessage::RaftWrite(command, reply) => { + let success = handle_raft_write(&myself, state, command).await; + let _ = reply.send(success); + } } Ok(()) } @@ -377,6 +436,33 @@ fn should_accept_election(request: &ElectionRequest, state: &GitNodeState) -> bo ); return false; } + + // Raft log consistency check: candidate's log must be at least as up-to-date as ours. + // This prevents a node with stale data from winning an election. + let my_last_index = state.raft_log.last_index(); + let my_last_term = state.raft_log.last_term(); + + if request.last_log_term < my_last_term { + tracing::warn!( + candidate_term = request.last_log_term, + my_term = my_last_term, + candidate = %request.candidate_storage_name, + "rejecting election: candidate log term is older" + ); + return false; + } + if request.last_log_term == my_last_term + && request.last_log_index < my_last_index + { + tracing::warn!( + candidate_index = request.last_log_index, + my_index = my_last_index, + candidate = %request.candidate_storage_name, + "rejecting election: candidate log is shorter" + ); + return false; + } + true } @@ -552,6 +638,7 @@ pub async fn start_node_actor( service: GitksService, storage_name: String, grpc_addr: String, + data_dir: PathBuf, ) -> Result<(ActorRef, tokio::task::JoinHandle<()>), ractor::SpawnErr> { let actor = GitNodeActor::init(service); let (actor_ref, handle) = Actor::spawn( @@ -560,6 +647,7 @@ pub async fn start_node_actor( GitNodeArgs { storage_name, grpc_addr, + data_dir, }, ) .await?; @@ -603,3 +691,435 @@ pub fn broadcast_role_changed(_actor: &ActorRef, event: RoleChan .ok(); } } + +// ── Raft consensus helpers ─────────────────────────────────── + +/// Handle AppendEntries RPC from Leader (Follower side). +fn handle_append_entries( + _myself: &ActorRef, + state: &mut GitNodeState, + request: &AppendEntriesRequest, +) -> AppendEntriesResponse { + // Step 1: Reply false if term < currentTerm + if request.term < state.current_term { + tracing::debug!( + request_term = request.term, + local_term = state.current_term, + "AppendEntries rejected: stale term" + ); + return AppendEntriesResponse { + version: RAFT_MSG_VERSION, + term: state.current_term, + success: false, + match_index: state.raft_log.last_index(), + conflict_index: 0, + conflict_term: 0, + }; + } + + // Step 2: Update leader info if term >= currentTerm + if request.term >= state.current_term { + state.current_term = request.term; + state.voted_for = None; + state.leader_id = Some(request.leader_id.clone()); + state.leader_grpc_addr = Some(request.leader_grpc_addr.clone()); + // If we were primary but received a valid AppendEntries from a higher term, + // step down + if state.is_primary { + tracing::info!( + term = request.term, + leader = %request.leader_id, + "stepping down from PRIMARY (received AppendEntries from new leader)" + ); + state.is_primary = false; + for entry in state.repos.values_mut() { + entry.role = ROLE_REPLICA.to_string(); + } + } + } + + if request.prev_log_index > 0 { + let prev_term = state.raft_log.term_at(request.prev_log_index); + if prev_term == 0 { + tracing::debug!( + prev_log_index = request.prev_log_index, + last_index = state.raft_log.last_index(), + "AppendEntries rejected: missing prev_log_index" + ); + return AppendEntriesResponse { + version: RAFT_MSG_VERSION, + term: state.current_term, + success: false, + match_index: state.raft_log.last_index(), + conflict_index: state.raft_log.last_index() + 1, + conflict_term: 0, + }; + } + if prev_term != request.prev_log_term { + let conflict_term = prev_term; + let conflict_index = find_first_index_of_term(state, request.prev_log_index); + tracing::debug!( + prev_log_index = request.prev_log_index, + expected_term = request.prev_log_term, + actual_term = conflict_term, + "AppendEntries rejected: term conflict" + ); + return AppendEntriesResponse { + version: RAFT_MSG_VERSION, + term: state.current_term, + success: false, + match_index: state.raft_log.last_index(), + conflict_index, + conflict_term, + }; + } + } + + for entry in &request.entries { + let existing_term = state.raft_log.term_at(entry.index); + if existing_term != 0 && existing_term != entry.term { + tracing::debug!( + index = entry.index, + existing_term, + new_term = entry.term, + "truncating conflicting log entries" + ); + if let Err(e) = state.raft_log.truncate_from(entry.index) { + tracing::error!(error = %e, "failed to truncate raft log"); + return AppendEntriesResponse { + version: RAFT_MSG_VERSION, + term: state.current_term, + success: false, + match_index: state.raft_log.last_index(), + conflict_index: 0, + conflict_term: 0, + }; + } + } + if state.raft_log.term_at(entry.index) == 0 { + if let Some(raft_entry) = entry.to_entry() + && let Err(e) = state.raft_log.append_reserved(raft_entry) + { + tracing::error!(error = %e, "failed to append raft entry"); + return AppendEntriesResponse { + version: RAFT_MSG_VERSION, + term: state.current_term, + success: false, + match_index: state.raft_log.last_index(), + conflict_index: 0, + conflict_term: 0, + }; + } + } + } + + // Step 5: Update commit_index + if request.leader_commit > state.raft_log.commit_index() { + let new_commit = request.leader_commit.min(state.raft_log.last_index()); + state.raft_log.advance_commit_index(new_commit); + } + + let match_index = state.raft_log.last_index(); + tracing::debug!( + leader = %request.leader_id, + term = request.term, + entries_received = request.entries.len(), + match_index, + "AppendEntries accepted" + ); + + AppendEntriesResponse { + version: RAFT_MSG_VERSION, + term: state.current_term, + success: true, + match_index, + conflict_index: 0, + conflict_term: 0, + } +} + +/// Find the first index of the term that conflicts at the given index. +fn find_first_index_of_term(state: &GitNodeState, index: u64) -> u64 { + let term = state.raft_log.term_at(index); + if term == 0 { + return index; + } + // Walk backwards to find the first entry of this term + for i in (1..=index).rev() { + if state.raft_log.term_at(i) != term { + return i + 1; + } + } + 1 +} + +/// Handle ReadIndex request (confirm Leader is still valid). +fn handle_read_index(state: &GitNodeState) -> ReadIndexResponse { + ReadIndexResponse { + commit_index: state.raft_log.commit_index(), + leader_term: state.current_term, + is_leader: state.is_primary && state.leader_lease_deadline.is_some_and(|d| d > Instant::now()), + } +} + +/// Broadcast AppendEntries to all followers and collect responses. +/// Returns the number of successful responses (including self). +pub async fn broadcast_append_entries( + myself: &ActorRef, + state: &mut GitNodeState, + entries: Vec, +) -> u64 { + let members = ractor::pg::get_members(&"gitks_nodes".to_string()); + let my_cell = myself.get_cell(); + let mut success_count = 1u64; // Count self + + let serialized_entries: Vec = entries + .iter() + .map(crate::actor::message::SerializedRaftEntry::from_entry) + .collect(); + + for member in &members { + if *member == my_cell { + continue; + } + let actor_ref: ActorRef = member.clone().into(); + let follower_id = format!("{:?}", member.get_id()); + + let prev_log_index = state.match_index.get(&follower_id).copied().unwrap_or(0); + let prev_log_term = state.raft_log.term_at(prev_log_index); + + let request = AppendEntriesRequest { + version: RAFT_MSG_VERSION, + term: state.current_term, + leader_id: state.storage_name.clone(), + leader_grpc_addr: state.grpc_addr.clone(), + prev_log_index, + prev_log_term, + entries: serialized_entries.clone(), + leader_commit: state.raft_log.commit_index(), + }; + + match ractor::call_t!(actor_ref, GitNodeMessage::AppendEntries, 5000, request) { + Ok(response) if response.success => { + success_count += 1; + state.match_index.insert(follower_id.clone(), response.match_index); + state.next_index.insert(follower_id, response.match_index + 1); + } + Ok(response) => { + // Follower rejected — update next_index for retry + tracing::debug!( + follower = %follower_id, + term = response.term, + conflict_index = response.conflict_index, + "AppendEntries rejected by follower" + ); + // Decrement next_index (optimization: use conflict info) + let next = state.next_index.get(&follower_id).copied().unwrap_or(1); + if response.conflict_index > 0 && response.conflict_index < next { + state.next_index.insert(follower_id, response.conflict_index); + } else if next > 1 { + state.next_index.insert(follower_id, next - 1); + } + } + Err(e) => { + tracing::warn!(follower = %follower_id, error = %e, "AppendEntries RPC failed"); + } + } + } + + success_count +} + +/// Check if Leader lease is still valid. +pub fn is_leader_lease_valid(state: &GitNodeState) -> bool { + state.is_primary + && state.leader_lease_deadline.is_some_and(|d| d > Instant::now()) +} + +/// Update Leader lease after successful majority replication. +pub fn renew_leader_lease(state: &mut GitNodeState) { + state.leader_lease_deadline = Some(Instant::now() + LEADER_LEASE_DURATION); +} + +/// Handle a Raft write command (Leader side). +/// This is the core of the Raft consensus write path: +/// 1. Check leader lease +/// 2. Create and append log entry +/// 3. Broadcast to followers and wait for majority +/// 4. Advance commit index and apply +async fn handle_raft_write( + myself: &ActorRef, + state: &mut GitNodeState, + command: crate::actor::raft_log::Command, +) -> bool { + // Step 1: Check if we are the Leader with a valid lease + if !state.is_primary { + tracing::warn!("Raft write rejected: not primary"); + return false; + } + + if !is_leader_lease_valid(state) { + tracing::warn!("Raft write rejected: leader lease expired"); + return false; + } + + // Step 2: Create log entry and append to local log + let term = state.current_term; + let entry = crate::actor::raft_log::LogEntry::new(term, state.raft_log.next_index(), command); + let entry_index = entry.index; + + if let Err(e) = state.raft_log.append_reserved(entry.clone()) { + tracing::error!(error = %e, "failed to append raft entry locally"); + return false; + } + + // Step 3: Broadcast AppendEntries to all followers + let members = ractor::pg::get_members(&"gitks_nodes".to_string()); + let my_cell = myself.get_cell(); + let total_nodes = members.len() as u64; + let majority = (total_nodes / 2) + 1; + + let serialized_entry = crate::actor::message::SerializedRaftEntry::from_entry(&entry); + let mut success_count = 1u64; // Count self + + for member in &members { + if *member == my_cell { + continue; + } + let actor_ref: ActorRef = member.clone().into(); + let follower_id = format!("{:?}", member.get_id()); + + let prev_log_index = state.match_index.get(&follower_id).copied().unwrap_or(0); + let prev_log_term = state.raft_log.term_at(prev_log_index); + + let request = AppendEntriesRequest { + version: RAFT_MSG_VERSION, + term: state.current_term, + leader_id: state.storage_name.clone(), + leader_grpc_addr: state.grpc_addr.clone(), + prev_log_index, + prev_log_term, + entries: vec![serialized_entry.clone()], + leader_commit: state.raft_log.commit_index(), + }; + + match ractor::call_t!(actor_ref, GitNodeMessage::AppendEntries, 5000, request) { + Ok(response) if response.success => { + success_count += 1; + state.match_index.insert(follower_id.clone(), response.match_index); + state.next_index.insert(follower_id, response.match_index + 1); + } + Ok(response) => { + tracing::debug!( + follower = %follower_id, + term = response.term, + "AppendEntries rejected by follower during write" + ); + } + Err(e) => { + tracing::warn!(follower = %follower_id, error = %e, "AppendEntries RPC failed during write"); + } + } + } + + // Step 4: Check if we achieved majority + if success_count >= majority { + // Advance commit index + state.raft_log.advance_commit_index(entry_index); + + // Renew leader lease + renew_leader_lease(state); + + tracing::info!( + index = entry_index, + term, + success_count, + majority, + "Raft write committed" + ); + + // Step 5: Apply the command to the state machine + apply_raft_command(state, &entry.command); + + true + } else { + tracing::warn!( + index = entry_index, + success_count, + majority, + "Raft write failed: no majority" + ); + false + } +} + +/// Apply a committed Raft command to the state machine. +fn apply_raft_command(state: &mut GitNodeState, command: &crate::actor::raft_log::Command) { + match command { + crate::actor::raft_log::Command::RefUpdate { + relative_path, + ref_name, + old_oid: _, + new_oid, + } => { + // Update local repo state + tracing::info!( + relative_path = %relative_path, + ref_name = %ref_name, + "applying RefUpdate from Raft log" + ); + // The actual git ref update is already done by the primary before calling raft_consensus_write. + // Here we just update the actor's tracking state. + if let Some(entry) = state.repos.get_mut(relative_path) { + entry.last_commit = new_oid.clone(); + } + } + crate::actor::raft_log::Command::RegisterRepo { + relative_path, + storage_name, + } => { + tracing::info!( + relative_path = %relative_path, + storage_name = %storage_name, + "applying RegisterRepo from Raft log" + ); + state.repos.entry(relative_path.clone()).or_insert_with(|| RepoEntry { + role: ROLE_REPLICA.to_string(), + last_commit: String::new(), + read_only: false, + }); + } + crate::actor::raft_log::Command::RemoveRepo { relative_path } => { + tracing::info!( + relative_path = %relative_path, + "applying RemoveRepo from Raft log" + ); + state.repos.remove(relative_path); + } + crate::actor::raft_log::Command::SetPrimary { + storage_name, + relative_paths, + } => { + tracing::info!( + storage_name = %storage_name, + paths = relative_paths.len(), + "applying SetPrimary from Raft log" + ); + // Update role for the specified paths + for path in relative_paths { + if let Some(entry) = state.repos.get_mut(path) { + if storage_name == &state.storage_name { + entry.role = ROLE_PRIMARY.to_string(); + entry.read_only = false; + } else { + entry.role = ROLE_REPLICA.to_string(); + entry.read_only = true; + } + } + } + } + } + + // Advance last_applied + state.raft_log.advance_last_applied(state.raft_log.commit_index()); +}