From bcd750b905974de00ecc6ed6e7108702944294df Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Wed, 10 Jun 2026 18:32:22 +0800 Subject: [PATCH] feat(raft): add log truncation for AppendEntries conflict resolution Add truncate_from() method to RaftLog for removing entries from a given index onwards, as required by the Raft protocol when a follower detects a term mismatch during AppendEntries. --- actor/raft_log.rs | 733 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 733 insertions(+) create mode 100644 actor/raft_log.rs diff --git a/actor/raft_log.rs b/actor/raft_log.rs new file mode 100644 index 0000000..80cbfe3 --- /dev/null +++ b/actor/raft_log.rs @@ -0,0 +1,733 @@ +//! Raft log storage with disk persistence and CAS-based concurrent append. +//! +//! The log is stored in memory for fast access and persisted to disk for crash recovery. +//! Each entry has a CRC32 checksum for integrity verification. +//! +//! Storage format: +//! - `raft-log.dat`: Append-only log file containing serialized entries +//! - `raft-index.dat`: Index file mapping entry index to file offset (for random access) + +use std::io::{BufReader, BufWriter, Read, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; + +use ractor_cluster::BytesConvertable; + +use crate::error::{GitError, GitResult}; +use crate::actor::snapshot::{RaftSnapshot, SnapshotStorage}; +use crate::actor::handler::RepoEntry; +use std::collections::HashMap; + +/// Protocol version for forward/backward compatibility. +pub const RAFT_PROTOCOL_VERSION: u32 = 1; + +/// Maximum log entry size (1MB). +const MAX_ENTRY_SIZE: usize = 1024 * 1024; + +/// Log compression threshold (100MB). +pub const LOG_COMPACT_THRESHOLD_BYTES: u64 = 100 * 1024 * 1024; + +// ── Command ────────────────────────────────────────────────── + +/// A Raft command that can be applied to the state machine. +#[derive(Debug, Clone)] +pub enum Command { + RefUpdate { + relative_path: String, + ref_name: String, + old_oid: String, + new_oid: String, + }, + RegisterRepo { + relative_path: String, + storage_name: String, + }, + RemoveRepo { + relative_path: String, + }, + SetPrimary { + storage_name: String, + relative_paths: Vec, + }, +} + +impl Command { + /// Serialize command to bytes. + pub fn encode(&self) -> Vec { + let mut buf = Vec::new(); + match self { + Command::RefUpdate { relative_path, ref_name, old_oid, new_oid } => { + buf.push(0); // tag + encode_strings(&mut buf, &[relative_path, ref_name, old_oid, new_oid]); + } + Command::RegisterRepo { relative_path, storage_name } => { + buf.push(1); + encode_strings(&mut buf, &[relative_path, storage_name]); + } + Command::RemoveRepo { relative_path } => { + buf.push(2); + encode_strings(&mut buf, &[relative_path]); + } + Command::SetPrimary { storage_name, relative_paths } => { + buf.push(3); + encode_string(&mut buf, storage_name); + buf.extend((relative_paths.len() as u32).to_be_bytes()); + for p in relative_paths { + encode_string(&mut buf, p); + } + } + } + buf + } + + /// Deserialize command from bytes. + pub fn decode(data: &[u8]) -> Option { + if data.is_empty() { + return None; + } + let tag = data[0]; + let mut offset = 1; + match tag { + 0 => { + let (rp, o1) = decode_string(data, offset)?; + offset = o1; + let (rn, o2) = decode_string(data, offset)?; + offset = o2; + let (oo, o3) = decode_string(data, offset)?; + offset = o3; + let (no, _) = decode_string(data, offset)?; + Some(Command::RefUpdate { + relative_path: rp, + ref_name: rn, + old_oid: oo, + new_oid: no, + }) + } + 1 => { + let (rp, o1) = decode_string(data, offset)?; + offset = o1; + let (sn, _) = decode_string(data, offset)?; + Some(Command::RegisterRepo { + relative_path: rp, + storage_name: sn, + }) + } + 2 => { + let (rp, _) = decode_string(data, offset)?; + Some(Command::RemoveRepo { relative_path: rp }) + } + 3 => { + let (sn, o1) = decode_string(data, offset)?; + offset = o1; + if offset + 4 > data.len() { + return None; + } + let count = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize; + offset += 4; + let mut paths = Vec::with_capacity(count); + for _ in 0..count { + let (p, o) = decode_string(data, offset)?; + offset = o; + paths.push(p); + } + Some(Command::SetPrimary { + storage_name: sn, + relative_paths: paths, + }) + } + _ => None, + } + } +} + +impl BytesConvertable for Command { + fn into_bytes(self) -> Vec { + self.encode() + } + + fn from_bytes(bytes: Vec) -> Self { + Self::decode(&bytes).unwrap_or(Command::RemoveRepo { + relative_path: String::new(), + }) + } +} + +fn encode_string(buf: &mut Vec, s: &str) { + let bytes = s.as_bytes(); + buf.extend((bytes.len() as u32).to_be_bytes()); + buf.extend(bytes); +} + +fn encode_strings(buf: &mut Vec, strings: &[&str]) { + buf.extend((strings.len() as u32).to_be_bytes()); + for s in strings { + encode_string(buf, s); + } +} + +fn decode_string(data: &[u8], offset: usize) -> Option<(String, usize)> { + if offset + 4 > data.len() { + return None; + } + let len = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize; + let start = offset + 4; + if start + len > data.len() { + return None; + } + let s = String::from_utf8_lossy(&data[start..start + len]).into_owned(); + Some((s, start + len)) +} + +// ── LogEntry ───────────────────────────────────────────────── + +/// A single Raft log entry. +#[derive(Debug, Clone)] +pub struct LogEntry { + pub term: u64, + pub index: u64, + pub command: Command, + pub checksum: u32, +} + +impl LogEntry { + pub fn new(term: u64, index: u64, command: Command) -> Self { + let checksum = Self::compute_checksum(term, index, &command); + Self { term, index, command, checksum } + } + + fn compute_checksum(term: u64, index: u64, command: &Command) -> u32 { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(&term.to_be_bytes()); + hasher.update(&index.to_be_bytes()); + hasher.update(&command.encode()); + hasher.finalize() + } + + /// Serialize entry to bytes (for disk storage). + pub fn encode(&self) -> Vec { + let cmd_bytes = self.command.encode(); + let total_len = 8 + 8 + 4 + 4 + cmd_bytes.len(); // term + index + checksum + cmd_len + cmd + let mut buf = Vec::with_capacity(total_len); + buf.extend(self.term.to_be_bytes()); + buf.extend(self.index.to_be_bytes()); + buf.extend(self.checksum.to_be_bytes()); + buf.extend((cmd_bytes.len() as u32).to_be_bytes()); + buf.extend(&cmd_bytes); + buf + } + + /// Deserialize entry from bytes. + pub fn decode(data: &[u8]) -> Option { + if data.len() < 24 { + return None; + } + let term = u64::from_be_bytes(data[0..8].try_into().ok()?); + let index = u64::from_be_bytes(data[8..16].try_into().ok()?); + let checksum = u32::from_be_bytes(data[16..20].try_into().ok()?); + let cmd_len = u32::from_be_bytes(data[20..24].try_into().ok()?) as usize; + if 24 + cmd_len > data.len() { + return None; + } + let command = Command::decode(&data[24..24 + cmd_len])?; + + // Verify checksum + let expected = Self::compute_checksum(term, index, &command); + if checksum != expected { + tracing::warn!( + term, + index, + expected, + actual = checksum, + "log entry checksum mismatch" + ); + return None; + } + + Some(LogEntry { term, index, command, checksum }) + } +} + +// ── IndexEntry ─────────────────────────────────────────────── + +/// Maps a log index to its file offset (for random access). +#[derive(Debug, Clone, Copy)] +struct IndexEntry { + index: u64, + file_offset: u64, + entry_size: u32, +} + +impl IndexEntry { + const SIZE: usize = 20; // 8 + 8 + 4 + + fn encode(&self) -> [u8; Self::SIZE] { + let mut buf = [0u8; Self::SIZE]; + buf[0..8].copy_from_slice(&self.index.to_be_bytes()); + buf[8..16].copy_from_slice(&self.file_offset.to_be_bytes()); + buf[16..20].copy_from_slice(&self.entry_size.to_be_bytes()); + buf + } + + #[allow(dead_code)] + fn decode(data: &[u8; Self::SIZE]) -> Self { + Self { + index: u64::from_be_bytes(data[0..8].try_into().unwrap()), + file_offset: u64::from_be_bytes(data[8..16].try_into().unwrap()), + entry_size: u32::from_be_bytes(data[16..20].try_into().unwrap()), + } + } +} + +// ── RaftStorage ────────────────────────────────────────────── + +/// Disk persistence layer for the Raft log. +struct RaftStorage { + log_path: PathBuf, + index_path: PathBuf, +} + +impl RaftStorage { + fn new(data_dir: &Path) -> Self { + Self { + log_path: data_dir.join("raft-log.dat"), + index_path: data_dir.join("raft-index.dat"), + } + } + + /// Append an entry to the log file and update the index. + fn append(&self, entry: &LogEntry) -> GitResult { + let entry_bytes = entry.encode(); + let entry_size = entry_bytes.len() as u32; + + // Append to log file + let mut log_file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.log_path) + .map_err(GitError::Io)?; + let file_offset = log_file.metadata().map_err(GitError::Io)?.len(); + log_file.write_all(&entry_bytes).map_err(GitError::Io)?; + log_file.flush().map_err(GitError::Io)?; + + // Append to index file + let index_entry = IndexEntry { + index: entry.index, + file_offset, + entry_size, + }; + let mut index_file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&self.index_path) + .map_err(GitError::Io)?; + index_file.write_all(&index_entry.encode()).map_err(GitError::Io)?; + index_file.flush().map_err(GitError::Io)?; + + Ok(entry.index) + } + + /// Load all entries from disk. + fn load_all(&self) -> GitResult> { + if !self.log_path.exists() { + return Ok(Vec::new()); + } + + let log_file = std::fs::File::open(&self.log_path).map_err(GitError::Io)?; + let mut reader = BufReader::new(log_file); + let mut entries = Vec::new(); + + loop { + // Read term (8 bytes) + let mut term_buf = [0u8; 8]; + match reader.read_exact(&mut term_buf) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(GitError::Io(e)), + } + let term = u64::from_be_bytes(term_buf); + + // Read index (8 bytes) + let mut index_buf = [0u8; 8]; + reader.read_exact(&mut index_buf).map_err(GitError::Io)?; + let index = u64::from_be_bytes(index_buf); + + // Read checksum (4 bytes) + let mut checksum_buf = [0u8; 4]; + reader.read_exact(&mut checksum_buf).map_err(GitError::Io)?; + let checksum = u32::from_be_bytes(checksum_buf); + + // Read command length (4 bytes) + let mut cmd_len_buf = [0u8; 4]; + reader.read_exact(&mut cmd_len_buf).map_err(GitError::Io)?; + let cmd_len = u32::from_be_bytes(cmd_len_buf) as usize; + + if cmd_len > MAX_ENTRY_SIZE { + tracing::warn!(index, cmd_len, "entry too large, stopping recovery"); + break; + } + + // Read command bytes + let mut cmd_buf = vec![0u8; cmd_len]; + reader.read_exact(&mut cmd_buf).map_err(GitError::Io)?; + + // Decode command + match Command::decode(&cmd_buf) { + Some(command) => { + let expected_checksum = LogEntry::compute_checksum(term, index, &command); + if checksum != expected_checksum { + tracing::warn!( + index, + expected = expected_checksum, + actual = checksum, + "checksum mismatch during recovery, stopping" + ); + break; + } + entries.push(LogEntry { term, index, command, checksum }); + } + None => { + tracing::warn!(index, "failed to decode command during recovery, stopping"); + break; + } + } + } + + tracing::info!(count = entries.len(), "loaded raft log entries from disk"); + Ok(entries) + } + + /// Get the total size of the log file. + fn log_file_size(&self) -> u64 { + std::fs::metadata(&self.log_path) + .map(|m| m.len()) + .unwrap_or(0) + } + + /// Truncate the log file and rebuild the index. + fn truncate_and_rebuild(&self, entries: &[LogEntry]) -> GitResult<()> { + // Write new log file + let log_file = std::fs::File::create(&self.log_path).map_err(GitError::Io)?; + let mut writer = BufWriter::new(log_file); + let mut index_entries = Vec::with_capacity(entries.len()); + let mut offset = 0u64; + + for entry in entries { + let entry_bytes = entry.encode(); + let entry_size = entry_bytes.len() as u32; + index_entries.push(IndexEntry { + index: entry.index, + file_offset: offset, + entry_size, + }); + writer.write_all(&entry_bytes).map_err(GitError::Io)?; + offset += entry_size as u64; + } + writer.flush().map_err(GitError::Io)?; + + // Write new index file + let index_file = std::fs::File::create(&self.index_path).map_err(GitError::Io)?; + let mut writer = BufWriter::new(index_file); + for ie in &index_entries { + writer.write_all(&ie.encode()).map_err(GitError::Io)?; + } + writer.flush().map_err(GitError::Io)?; + + Ok(()) + } +} + +// ── RaftLog ────────────────────────────────────────────────── + +/// Thread-safe Raft log with in-memory storage and disk persistence. +pub struct RaftLog { + entries: Vec, + commit_index: u64, + last_applied: u64, + /// Atomic next index for CAS-based concurrent append. + next_index: AtomicU64, + storage: RaftStorage, + /// Snapshot storage for log compaction. + snapshot_storage: SnapshotStorage, + /// Last snapshot index (entries before this are discarded). + snapshot_index: u64, + /// Last snapshot term. + snapshot_term: u64, + /// Path for snapshot data. + data_dir: PathBuf, +} + +impl RaftLog { + /// Create a new RaftLog, loading existing entries from disk. + pub fn new(data_dir: &Path) -> GitResult { + std::fs::create_dir_all(data_dir).map_err(GitError::Io)?; + let storage = RaftStorage::new(data_dir); + let snapshot_storage = SnapshotStorage::new(data_dir); + + // Load snapshot if exists + let (snapshot_index, snapshot_term) = match snapshot_storage.load()? { + Some(snapshot) => { + tracing::info!( + index = snapshot.last_included_index, + term = snapshot.last_included_term, + "loaded existing raft snapshot" + ); + (snapshot.last_included_index, snapshot.last_included_term) + } + None => (0, 0), + }; + + let entries = storage.load_all()?; + + let next_index = entries.last().map(|e| e.index + 1).unwrap_or(snapshot_index + 1); + let last_applied = entries.last().map(|e| e.index).unwrap_or(snapshot_index); + + Ok(Self { + entries, + commit_index: snapshot_index, + last_applied, + next_index: AtomicU64::new(next_index), + storage, + snapshot_storage, + snapshot_index, + snapshot_term, + data_dir: data_dir.to_path_buf(), + }) + } + + /// Get the last log index (0 if empty). + pub fn last_index(&self) -> u64 { + self.entries.last().map(|e| e.index).unwrap_or(0) + } + + /// Get the last log term (0 if empty). + pub fn last_term(&self) -> u64 { + self.entries.last().map(|e| e.term).unwrap_or(0) + } + + /// Get the current commit index. + pub fn commit_index(&self) -> u64 { + self.commit_index + } + + /// Get the last applied index. + pub fn last_applied(&self) -> u64 { + self.last_applied + } + + /// Get the next index for a new entry. + pub fn next_index(&self) -> u64 { + self.next_index.load(Ordering::SeqCst) + } + + /// CAS-based index reservation for concurrent append. + /// Returns the reserved index. The caller must then call `append_reserved`. + pub fn reserve_index(&self) -> u64 { + self.next_index.fetch_add(1, Ordering::SeqCst) + } + + /// Append a pre-reserved entry to the log. + /// The entry must have been assigned an index via `reserve_index`. + pub fn append_reserved(&mut self, entry: LogEntry) -> GitResult { + // Persist to disk + self.storage.append(&entry)?; + // Add to memory + self.entries.push(entry.clone()); + Ok(entry.index) + } + + /// Convenience: create and append an entry in one call (non-concurrent). + pub fn append(&mut self, term: u64, command: Command) -> GitResult { + let index = self.reserve_index(); + let entry = LogEntry::new(term, index, command); + self.append_reserved(entry) + } + + /// Get an entry by index. + pub fn get(&self, index: u64) -> Option<&LogEntry> { + if self.entries.is_empty() { + return None; + } + let first_index = self.entries[0].index; + if index < first_index { + return None; + } + let offset = (index - first_index) as usize; + self.entries.get(offset) + } + + /// Get entries in range [from_index, to_index). + pub fn get_range(&self, from_index: u64, to_index: u64) -> Vec<&LogEntry> { + if self.entries.is_empty() { + return Vec::new(); + } + let first_index = self.entries[0].index; + let start = if from_index < first_index { + 0 + } else { + (from_index - first_index) as usize + }; + let end = if to_index < first_index { + 0 + } else { + ((to_index - first_index) as usize).min(self.entries.len()) + }; + self.entries[start..end].iter().collect() + } + + /// Get the term of the entry at the given index. + pub fn term_at(&self, index: u64) -> u64 { + self.get(index).map(|e| e.term).unwrap_or(0) + } + + /// Advance commit_index to the given value. + pub fn advance_commit_index(&mut self, new_commit_index: u64) { + if new_commit_index > self.commit_index { + self.commit_index = new_commit_index; + tracing::debug!(commit_index = new_commit_index, "commit index advanced"); + } + } + + /// Mark entries up to the given index as applied. + pub fn advance_last_applied(&mut self, new_last_applied: u64) { + if new_last_applied > self.last_applied { + self.last_applied = new_last_applied; + } + } + + /// Get committed entries that haven't been applied yet. + pub fn unapplied_entries(&self) -> Vec<&LogEntry> { + self.get_range(self.last_applied + 1, self.commit_index + 1) + } + + /// Check if the log needs compaction. + pub fn needs_compaction(&self) -> bool { + self.storage.log_file_size() >= LOG_COMPACT_THRESHOLD_BYTES + } + + /// Compact the log, keeping entries from `from_index` onwards. + pub fn compact(&mut self, from_index: u64) -> GitResult<()> { + if from_index <= self.entries[0].index { + return Ok(()); // Nothing to compact + } + + let keep: Vec = self.entries.iter() + .filter(|e| e.index >= from_index) + .cloned() + .collect(); + + if keep.is_empty() { + return Ok(()); + } + + self.storage.truncate_and_rebuild(&keep)?; + self.entries = keep; + + tracing::info!( + from_index, + kept = self.entries.len(), + "raft log compacted" + ); + Ok(()) + } + + /// Truncate the log, removing entries from `from_index` onwards (inclusive). + /// This is used during AppendEntries to resolve log conflicts per the Raft protocol: + /// when a follower detects a term mismatch, it must delete the conflicting entry + /// and all entries that follow. + pub fn truncate_from(&mut self, from_index: u64) -> GitResult<()> { + let keep: Vec = self.entries.iter() + .filter(|e| e.index < from_index) + .cloned() + .collect(); + + let removed = self.entries.len() - keep.len(); + if removed == 0 { + return Ok(()); + } + + self.storage.truncate_and_rebuild(&keep)?; + self.entries = keep; + + let new_next = self.entries.last().map(|e| e.index + 1).unwrap_or(1); + self.next_index.store(new_next, Ordering::SeqCst); + + tracing::info!( + from_index, + removed, + remaining = self.entries.len(), + "raft log truncated" + ); + Ok(()) + } + + /// Get the data directory path (for snapshot storage). + pub fn data_dir(&self) -> &Path { + &self.data_dir + } + + /// Get the total number of entries in the log. + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Check if the log is empty. + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Create a snapshot of the current state and compact the log. + pub fn create_snapshot(&mut self, repos: HashMap) -> GitResult<()> { + let snapshot = RaftSnapshot::new( + self.last_applied, + self.term_at(self.last_applied), + repos, + ); + + self.snapshot_storage.save(&snapshot)?; + self.snapshot_index = snapshot.last_included_index; + self.snapshot_term = snapshot.last_included_term; + + // Compact the log: remove entries before the snapshot + self.compact(self.snapshot_index + 1)?; + + tracing::info!( + snapshot_index = self.snapshot_index, + snapshot_term = self.snapshot_term, + remaining_entries = self.entries.len(), + "raft snapshot created and log compacted" + ); + + Ok(()) + } + + /// Restore state from a snapshot. + pub fn restore_snapshot(&mut self, snapshot: RaftSnapshot) -> GitResult> { + self.snapshot_index = snapshot.last_included_index; + self.snapshot_term = snapshot.last_included_term; + self.commit_index = snapshot.last_included_index; + self.last_applied = snapshot.last_included_index; + self.next_index.store(snapshot.last_included_index + 1, Ordering::SeqCst); + + // Clear all entries (they're covered by the snapshot) + self.entries.clear(); + self.storage.truncate_and_rebuild(&[])?; + + tracing::info!( + snapshot_index = self.snapshot_index, + snapshot_term = self.snapshot_term, + "raft state restored from snapshot" + ); + + Ok(snapshot.repos) + } + + /// Get the snapshot index. + pub fn snapshot_index(&self) -> u64 { + self.snapshot_index + } + + /// Get the snapshot term. + pub fn snapshot_term(&self) -> u64 { + self.snapshot_term + } +}