//! Raft log storage with disk persistence and CAS-based concurrent append. //! //! The log is stored in memory for fast access and persisted to disk for crash recovery. //! Each entry has a CRC32 checksum for integrity verification. //! //! Storage format: //! - `raft-log.dat`: Append-only log file containing serialized entries //! - `raft-index.dat`: Index file mapping entry index to file offset (for random access) use std::io::{BufReader, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use ractor_cluster::BytesConvertable; use crate::actor::handler::RepoEntry; use crate::actor::snapshot::{RaftSnapshot, SnapshotStorage}; use crate::error::{GitError, GitResult}; use std::collections::HashMap; /// Protocol version for forward/backward compatibility. pub const RAFT_PROTOCOL_VERSION: u32 = 1; /// Maximum log entry size (1MB). const MAX_ENTRY_SIZE: usize = 1024 * 1024; /// Log compression threshold (100MB). pub const LOG_COMPACT_THRESHOLD_BYTES: u64 = 100 * 1024 * 1024; // ── Command ────────────────────────────────────────────────── /// A Raft command that can be applied to the state machine. #[derive(Debug, Clone)] pub enum Command { RefUpdate { relative_path: String, ref_name: String, old_oid: String, new_oid: String, }, RegisterRepo { relative_path: String, storage_name: String, }, RemoveRepo { relative_path: String, }, SetPrimary { storage_name: String, relative_paths: Vec, }, } impl Command { /// Serialize command to bytes. pub fn encode(&self) -> Vec { let mut buf = Vec::new(); match self { Command::RefUpdate { relative_path, ref_name, old_oid, new_oid, } => { buf.push(0); // tag encode_strings(&mut buf, &[relative_path, ref_name, old_oid, new_oid]); } Command::RegisterRepo { relative_path, storage_name, } => { buf.push(1); encode_strings(&mut buf, &[relative_path, storage_name]); } Command::RemoveRepo { relative_path } => { buf.push(2); encode_strings(&mut buf, &[relative_path]); } Command::SetPrimary { storage_name, relative_paths, } => { buf.push(3); encode_string(&mut buf, storage_name); buf.extend((relative_paths.len() as u32).to_be_bytes()); for p in relative_paths { encode_string(&mut buf, p); } } } buf } /// Deserialize command from bytes. pub fn decode(data: &[u8]) -> Option { if data.is_empty() { return None; } let tag = data[0]; let mut offset = 1; match tag { 0 => { let (rp, o1) = decode_string(data, offset)?; offset = o1; let (rn, o2) = decode_string(data, offset)?; offset = o2; let (oo, o3) = decode_string(data, offset)?; offset = o3; let (no, _) = decode_string(data, offset)?; Some(Command::RefUpdate { relative_path: rp, ref_name: rn, old_oid: oo, new_oid: no, }) } 1 => { let (rp, o1) = decode_string(data, offset)?; offset = o1; let (sn, _) = decode_string(data, offset)?; Some(Command::RegisterRepo { relative_path: rp, storage_name: sn, }) } 2 => { let (rp, _) = decode_string(data, offset)?; Some(Command::RemoveRepo { relative_path: rp }) } 3 => { let (sn, o1) = decode_string(data, offset)?; offset = o1; if offset + 4 > data.len() { return None; } let count = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize; offset += 4; let mut paths = Vec::with_capacity(count); for _ in 0..count { let (p, o) = decode_string(data, offset)?; offset = o; paths.push(p); } Some(Command::SetPrimary { storage_name: sn, relative_paths: paths, }) } _ => None, } } } impl BytesConvertable for Command { fn into_bytes(self) -> Vec { self.encode() } fn from_bytes(bytes: Vec) -> Self { Self::decode(&bytes).unwrap_or(Command::RemoveRepo { relative_path: String::new(), }) } } fn encode_string(buf: &mut Vec, s: &str) { let bytes = s.as_bytes(); buf.extend((bytes.len() as u32).to_be_bytes()); buf.extend(bytes); } fn encode_strings(buf: &mut Vec, strings: &[&str]) { buf.extend((strings.len() as u32).to_be_bytes()); for s in strings { encode_string(buf, s); } } fn decode_string(data: &[u8], offset: usize) -> Option<(String, usize)> { if offset + 4 > data.len() { return None; } let len = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize; let start = offset + 4; if start + len > data.len() { return None; } let s = String::from_utf8_lossy(&data[start..start + len]).into_owned(); Some((s, start + len)) } // ── LogEntry ───────────────────────────────────────────────── /// A single Raft log entry. #[derive(Debug, Clone)] pub struct LogEntry { pub term: u64, pub index: u64, pub command: Command, pub checksum: u32, } impl LogEntry { pub fn new(term: u64, index: u64, command: Command) -> Self { let checksum = Self::compute_checksum(term, index, &command); Self { term, index, command, checksum, } } fn compute_checksum(term: u64, index: u64, command: &Command) -> u32 { let mut hasher = crc32fast::Hasher::new(); hasher.update(&term.to_be_bytes()); hasher.update(&index.to_be_bytes()); hasher.update(&command.encode()); hasher.finalize() } /// Serialize entry to bytes (for disk storage). pub fn encode(&self) -> Vec { let cmd_bytes = self.command.encode(); let total_len = 8 + 8 + 4 + 4 + cmd_bytes.len(); // term + index + checksum + cmd_len + cmd let mut buf = Vec::with_capacity(total_len); buf.extend(self.term.to_be_bytes()); buf.extend(self.index.to_be_bytes()); buf.extend(self.checksum.to_be_bytes()); buf.extend((cmd_bytes.len() as u32).to_be_bytes()); buf.extend(&cmd_bytes); buf } /// Deserialize entry from bytes. pub fn decode(data: &[u8]) -> Option { if data.len() < 24 { return None; } let term = u64::from_be_bytes(data[0..8].try_into().ok()?); let index = u64::from_be_bytes(data[8..16].try_into().ok()?); let checksum = u32::from_be_bytes(data[16..20].try_into().ok()?); let cmd_len = u32::from_be_bytes(data[20..24].try_into().ok()?) as usize; if 24 + cmd_len > data.len() { return None; } let command = Command::decode(&data[24..24 + cmd_len])?; // Verify checksum let expected = Self::compute_checksum(term, index, &command); if checksum != expected { tracing::warn!( term, index, expected, actual = checksum, "log entry checksum mismatch" ); return None; } Some(LogEntry { term, index, command, checksum, }) } } // ── IndexEntry ─────────────────────────────────────────────── /// Maps a log index to its file offset (for random access). #[derive(Debug, Clone, Copy)] struct IndexEntry { index: u64, file_offset: u64, entry_size: u32, } impl IndexEntry { const SIZE: usize = 20; // 8 + 8 + 4 fn encode(&self) -> [u8; Self::SIZE] { let mut buf = [0u8; Self::SIZE]; buf[0..8].copy_from_slice(&self.index.to_be_bytes()); buf[8..16].copy_from_slice(&self.file_offset.to_be_bytes()); buf[16..20].copy_from_slice(&self.entry_size.to_be_bytes()); buf } #[allow(dead_code)] fn decode(data: &[u8; Self::SIZE]) -> Self { Self { index: u64::from_be_bytes(data[0..8].try_into().unwrap()), file_offset: u64::from_be_bytes(data[8..16].try_into().unwrap()), entry_size: u32::from_be_bytes(data[16..20].try_into().unwrap()), } } } // ── RaftStorage ────────────────────────────────────────────── /// Disk persistence layer for the Raft log. struct RaftStorage { log_path: PathBuf, index_path: PathBuf, } impl RaftStorage { fn new(data_dir: &Path) -> Self { Self { log_path: data_dir.join("raft-log.dat"), index_path: data_dir.join("raft-index.dat"), } } /// Append an entry to the log file and update the index. fn append(&self, entry: &LogEntry) -> GitResult { let entry_bytes = entry.encode(); let entry_size = entry_bytes.len() as u32; // Append to log file let mut log_file = std::fs::OpenOptions::new() .create(true) .append(true) .open(&self.log_path) .map_err(GitError::Io)?; let file_offset = log_file.metadata().map_err(GitError::Io)?.len(); log_file.write_all(&entry_bytes).map_err(GitError::Io)?; log_file.flush().map_err(GitError::Io)?; // Append to index file let index_entry = IndexEntry { index: entry.index, file_offset, entry_size, }; let mut index_file = std::fs::OpenOptions::new() .create(true) .append(true) .open(&self.index_path) .map_err(GitError::Io)?; index_file .write_all(&index_entry.encode()) .map_err(GitError::Io)?; index_file.flush().map_err(GitError::Io)?; Ok(entry.index) } /// Load all entries from disk. fn load_all(&self) -> GitResult> { if !self.log_path.exists() { return Ok(Vec::new()); } let log_file = std::fs::File::open(&self.log_path).map_err(GitError::Io)?; let mut reader = BufReader::new(log_file); let mut entries = Vec::new(); loop { // Read term (8 bytes) let mut term_buf = [0u8; 8]; match reader.read_exact(&mut term_buf) { Ok(()) => {} Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(GitError::Io(e)), } let term = u64::from_be_bytes(term_buf); // Read index (8 bytes) let mut index_buf = [0u8; 8]; reader.read_exact(&mut index_buf).map_err(GitError::Io)?; let index = u64::from_be_bytes(index_buf); // Read checksum (4 bytes) let mut checksum_buf = [0u8; 4]; reader.read_exact(&mut checksum_buf).map_err(GitError::Io)?; let checksum = u32::from_be_bytes(checksum_buf); // Read command length (4 bytes) let mut cmd_len_buf = [0u8; 4]; reader.read_exact(&mut cmd_len_buf).map_err(GitError::Io)?; let cmd_len = u32::from_be_bytes(cmd_len_buf) as usize; if cmd_len > MAX_ENTRY_SIZE { tracing::warn!(index, cmd_len, "entry too large, stopping recovery"); break; } // Read command bytes let mut cmd_buf = vec![0u8; cmd_len]; reader.read_exact(&mut cmd_buf).map_err(GitError::Io)?; // Decode command match Command::decode(&cmd_buf) { Some(command) => { let expected_checksum = LogEntry::compute_checksum(term, index, &command); if checksum != expected_checksum { tracing::warn!( index, expected = expected_checksum, actual = checksum, "checksum mismatch during recovery, stopping" ); break; } entries.push(LogEntry { term, index, command, checksum, }); } None => { tracing::warn!(index, "failed to decode command during recovery, stopping"); break; } } } tracing::info!(count = entries.len(), "loaded raft log entries from disk"); Ok(entries) } /// Get the total size of the log file. fn log_file_size(&self) -> u64 { std::fs::metadata(&self.log_path) .map(|m| m.len()) .unwrap_or(0) } /// Truncate the log file and rebuild the index. fn truncate_and_rebuild(&self, entries: &[LogEntry]) -> GitResult<()> { // Write new log file let log_file = std::fs::File::create(&self.log_path).map_err(GitError::Io)?; let mut writer = BufWriter::new(log_file); let mut index_entries = Vec::with_capacity(entries.len()); let mut offset = 0u64; for entry in entries { let entry_bytes = entry.encode(); let entry_size = entry_bytes.len() as u32; index_entries.push(IndexEntry { index: entry.index, file_offset: offset, entry_size, }); writer.write_all(&entry_bytes).map_err(GitError::Io)?; offset += entry_size as u64; } writer.flush().map_err(GitError::Io)?; // Write new index file let index_file = std::fs::File::create(&self.index_path).map_err(GitError::Io)?; let mut writer = BufWriter::new(index_file); for ie in &index_entries { writer.write_all(&ie.encode()).map_err(GitError::Io)?; } writer.flush().map_err(GitError::Io)?; Ok(()) } } // ── RaftLog ────────────────────────────────────────────────── /// Thread-safe Raft log with in-memory storage and disk persistence. pub struct RaftLog { entries: Vec, commit_index: u64, last_applied: u64, /// Atomic next index for CAS-based concurrent append. next_index: AtomicU64, storage: RaftStorage, /// Snapshot storage for log compaction. snapshot_storage: SnapshotStorage, /// Last snapshot index (entries before this are discarded). snapshot_index: u64, /// Last snapshot term. snapshot_term: u64, /// Path for snapshot data. data_dir: PathBuf, } impl RaftLog { /// Create a new RaftLog, loading existing entries from disk. pub fn new(data_dir: &Path) -> GitResult { std::fs::create_dir_all(data_dir).map_err(GitError::Io)?; let storage = RaftStorage::new(data_dir); let snapshot_storage = SnapshotStorage::new(data_dir); // Load snapshot if exists let (snapshot_index, snapshot_term) = match snapshot_storage.load()? { Some(snapshot) => { tracing::info!( index = snapshot.last_included_index, term = snapshot.last_included_term, "loaded existing raft snapshot" ); (snapshot.last_included_index, snapshot.last_included_term) } None => (0, 0), }; let entries = storage.load_all()?; let next_index = entries .last() .map(|e| e.index + 1) .unwrap_or(snapshot_index + 1); let last_applied = entries.last().map(|e| e.index).unwrap_or(snapshot_index); Ok(Self { entries, commit_index: snapshot_index, last_applied, next_index: AtomicU64::new(next_index), storage, snapshot_storage, snapshot_index, snapshot_term, data_dir: data_dir.to_path_buf(), }) } /// Get the last log index (0 if empty). pub fn last_index(&self) -> u64 { self.entries.last().map(|e| e.index).unwrap_or(0) } /// Get the last log term (0 if empty). pub fn last_term(&self) -> u64 { self.entries.last().map(|e| e.term).unwrap_or(0) } /// Get the current commit index. pub fn commit_index(&self) -> u64 { self.commit_index } /// Get the last applied index. pub fn last_applied(&self) -> u64 { self.last_applied } /// Get the next index for a new entry. pub fn next_index(&self) -> u64 { self.next_index.load(Ordering::SeqCst) } /// CAS-based index reservation for concurrent append. /// Returns the reserved index. The caller must then call `append_reserved`. pub fn reserve_index(&self) -> u64 { self.next_index.fetch_add(1, Ordering::SeqCst) } /// Append a pre-reserved entry to the log. /// The entry must have been assigned an index via `reserve_index`. pub fn append_reserved(&mut self, entry: LogEntry) -> GitResult { // Persist to disk self.storage.append(&entry)?; // Add to memory self.entries.push(entry.clone()); Ok(entry.index) } /// Convenience: create and append an entry in one call (non-concurrent). pub fn append(&mut self, term: u64, command: Command) -> GitResult { let index = self.reserve_index(); let entry = LogEntry::new(term, index, command); self.append_reserved(entry) } /// Get an entry by index. pub fn get(&self, index: u64) -> Option<&LogEntry> { if self.entries.is_empty() { return None; } let first_index = self.entries[0].index; if index < first_index { return None; } let offset = (index - first_index) as usize; self.entries.get(offset) } /// Get entries in range [from_index, to_index). pub fn get_range(&self, from_index: u64, to_index: u64) -> Vec<&LogEntry> { if self.entries.is_empty() { return Vec::new(); } let first_index = self.entries[0].index; let start = if from_index < first_index { 0 } else { (from_index - first_index) as usize }; let end = if to_index < first_index { 0 } else { ((to_index - first_index) as usize).min(self.entries.len()) }; self.entries[start..end].iter().collect() } /// Get the term of the entry at the given index. pub fn term_at(&self, index: u64) -> u64 { self.get(index).map(|e| e.term).unwrap_or(0) } /// Advance commit_index to the given value. pub fn advance_commit_index(&mut self, new_commit_index: u64) { if new_commit_index > self.commit_index { self.commit_index = new_commit_index; tracing::debug!(commit_index = new_commit_index, "commit index advanced"); } } /// Mark entries up to the given index as applied. pub fn advance_last_applied(&mut self, new_last_applied: u64) { if new_last_applied > self.last_applied { self.last_applied = new_last_applied; } } /// Get committed entries that haven't been applied yet. pub fn unapplied_entries(&self) -> Vec<&LogEntry> { self.get_range(self.last_applied + 1, self.commit_index + 1) } /// Check if the log needs compaction. pub fn needs_compaction(&self) -> bool { self.storage.log_file_size() >= LOG_COMPACT_THRESHOLD_BYTES } /// Compact the log, keeping entries from `from_index` onwards. pub fn compact(&mut self, from_index: u64) -> GitResult<()> { if from_index <= self.entries[0].index { return Ok(()); // Nothing to compact } let keep: Vec = self .entries .iter() .filter(|e| e.index >= from_index) .cloned() .collect(); if keep.is_empty() { return Ok(()); } self.storage.truncate_and_rebuild(&keep)?; self.entries = keep; tracing::info!(from_index, kept = self.entries.len(), "raft log compacted"); Ok(()) } /// Truncate the log, removing entries from `from_index` onwards (inclusive). /// This is used during AppendEntries to resolve log conflicts per the Raft protocol: /// when a follower detects a term mismatch, it must delete the conflicting entry /// and all entries that follow. pub fn truncate_from(&mut self, from_index: u64) -> GitResult<()> { let keep: Vec = self .entries .iter() .filter(|e| e.index < from_index) .cloned() .collect(); let removed = self.entries.len() - keep.len(); if removed == 0 { return Ok(()); } self.storage.truncate_and_rebuild(&keep)?; self.entries = keep; let new_next = self.entries.last().map(|e| e.index + 1).unwrap_or(1); self.next_index.store(new_next, Ordering::SeqCst); tracing::info!( from_index, removed, remaining = self.entries.len(), "raft log truncated" ); Ok(()) } /// Get the data directory path (for snapshot storage). pub fn data_dir(&self) -> &Path { &self.data_dir } /// Get the total number of entries in the log. pub fn len(&self) -> usize { self.entries.len() } /// Check if the log is empty. pub fn is_empty(&self) -> bool { self.entries.is_empty() } /// Create a snapshot of the current state and compact the log. pub fn create_snapshot(&mut self, repos: HashMap) -> GitResult<()> { let snapshot = RaftSnapshot::new(self.last_applied, self.term_at(self.last_applied), repos); self.snapshot_storage.save(&snapshot)?; self.snapshot_index = snapshot.last_included_index; self.snapshot_term = snapshot.last_included_term; // Compact the log: remove entries before the snapshot self.compact(self.snapshot_index + 1)?; tracing::info!( snapshot_index = self.snapshot_index, snapshot_term = self.snapshot_term, remaining_entries = self.entries.len(), "raft snapshot created and log compacted" ); Ok(()) } /// Restore state from a snapshot. pub fn restore_snapshot( &mut self, snapshot: RaftSnapshot, ) -> GitResult> { self.snapshot_index = snapshot.last_included_index; self.snapshot_term = snapshot.last_included_term; self.commit_index = snapshot.last_included_index; self.last_applied = snapshot.last_included_index; self.next_index .store(snapshot.last_included_index + 1, Ordering::SeqCst); // Clear all entries (they're covered by the snapshot) self.entries.clear(); self.storage.truncate_and_rebuild(&[])?; tracing::info!( snapshot_index = self.snapshot_index, snapshot_term = self.snapshot_term, "raft state restored from snapshot" ); Ok(snapshot.repos) } /// Get the snapshot index. pub fn snapshot_index(&self) -> u64 { self.snapshot_index } /// Get the snapshot term. pub fn snapshot_term(&self) -> u64 { self.snapshot_term } }