feat(raft): add log truncation for AppendEntries conflict resolution

Add truncate_from() method to RaftLog for removing entries from a given
index onwards, as required by the Raft protocol when a follower detects
a term mismatch during AppendEntries.
This commit is contained in:
zhenyi
2026-06-10 18:32:22 +08:00
parent c8729d38bc
commit bcd750b905
+733
View File
@@ -0,0 +1,733 @@
//! Raft log storage with disk persistence and CAS-based concurrent append.
//!
//! The log is stored in memory for fast access and persisted to disk for crash recovery.
//! Each entry has a CRC32 checksum for integrity verification.
//!
//! Storage format:
//! - `raft-log.dat`: Append-only log file containing serialized entries
//! - `raft-index.dat`: Index file mapping entry index to file offset (for random access)
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use ractor_cluster::BytesConvertable;
use crate::error::{GitError, GitResult};
use crate::actor::snapshot::{RaftSnapshot, SnapshotStorage};
use crate::actor::handler::RepoEntry;
use std::collections::HashMap;
/// Protocol version for forward/backward compatibility.
pub const RAFT_PROTOCOL_VERSION: u32 = 1;
/// Maximum log entry size (1MB).
const MAX_ENTRY_SIZE: usize = 1024 * 1024;
/// Log compression threshold (100MB).
pub const LOG_COMPACT_THRESHOLD_BYTES: u64 = 100 * 1024 * 1024;
// ── Command ──────────────────────────────────────────────────
/// A Raft command that can be applied to the state machine.
#[derive(Debug, Clone)]
pub enum Command {
RefUpdate {
relative_path: String,
ref_name: String,
old_oid: String,
new_oid: String,
},
RegisterRepo {
relative_path: String,
storage_name: String,
},
RemoveRepo {
relative_path: String,
},
SetPrimary {
storage_name: String,
relative_paths: Vec<String>,
},
}
impl Command {
/// Serialize command to bytes.
pub fn encode(&self) -> Vec<u8> {
let mut buf = Vec::new();
match self {
Command::RefUpdate { relative_path, ref_name, old_oid, new_oid } => {
buf.push(0); // tag
encode_strings(&mut buf, &[relative_path, ref_name, old_oid, new_oid]);
}
Command::RegisterRepo { relative_path, storage_name } => {
buf.push(1);
encode_strings(&mut buf, &[relative_path, storage_name]);
}
Command::RemoveRepo { relative_path } => {
buf.push(2);
encode_strings(&mut buf, &[relative_path]);
}
Command::SetPrimary { storage_name, relative_paths } => {
buf.push(3);
encode_string(&mut buf, storage_name);
buf.extend((relative_paths.len() as u32).to_be_bytes());
for p in relative_paths {
encode_string(&mut buf, p);
}
}
}
buf
}
/// Deserialize command from bytes.
pub fn decode(data: &[u8]) -> Option<Self> {
if data.is_empty() {
return None;
}
let tag = data[0];
let mut offset = 1;
match tag {
0 => {
let (rp, o1) = decode_string(data, offset)?;
offset = o1;
let (rn, o2) = decode_string(data, offset)?;
offset = o2;
let (oo, o3) = decode_string(data, offset)?;
offset = o3;
let (no, _) = decode_string(data, offset)?;
Some(Command::RefUpdate {
relative_path: rp,
ref_name: rn,
old_oid: oo,
new_oid: no,
})
}
1 => {
let (rp, o1) = decode_string(data, offset)?;
offset = o1;
let (sn, _) = decode_string(data, offset)?;
Some(Command::RegisterRepo {
relative_path: rp,
storage_name: sn,
})
}
2 => {
let (rp, _) = decode_string(data, offset)?;
Some(Command::RemoveRepo { relative_path: rp })
}
3 => {
let (sn, o1) = decode_string(data, offset)?;
offset = o1;
if offset + 4 > data.len() {
return None;
}
let count = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize;
offset += 4;
let mut paths = Vec::with_capacity(count);
for _ in 0..count {
let (p, o) = decode_string(data, offset)?;
offset = o;
paths.push(p);
}
Some(Command::SetPrimary {
storage_name: sn,
relative_paths: paths,
})
}
_ => None,
}
}
}
impl BytesConvertable for Command {
fn into_bytes(self) -> Vec<u8> {
self.encode()
}
fn from_bytes(bytes: Vec<u8>) -> Self {
Self::decode(&bytes).unwrap_or(Command::RemoveRepo {
relative_path: String::new(),
})
}
}
fn encode_string(buf: &mut Vec<u8>, s: &str) {
let bytes = s.as_bytes();
buf.extend((bytes.len() as u32).to_be_bytes());
buf.extend(bytes);
}
fn encode_strings(buf: &mut Vec<u8>, strings: &[&str]) {
buf.extend((strings.len() as u32).to_be_bytes());
for s in strings {
encode_string(buf, s);
}
}
fn decode_string(data: &[u8], offset: usize) -> Option<(String, usize)> {
if offset + 4 > data.len() {
return None;
}
let len = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize;
let start = offset + 4;
if start + len > data.len() {
return None;
}
let s = String::from_utf8_lossy(&data[start..start + len]).into_owned();
Some((s, start + len))
}
// ── LogEntry ─────────────────────────────────────────────────
/// A single Raft log entry.
#[derive(Debug, Clone)]
pub struct LogEntry {
pub term: u64,
pub index: u64,
pub command: Command,
pub checksum: u32,
}
impl LogEntry {
pub fn new(term: u64, index: u64, command: Command) -> Self {
let checksum = Self::compute_checksum(term, index, &command);
Self { term, index, command, checksum }
}
fn compute_checksum(term: u64, index: u64, command: &Command) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&term.to_be_bytes());
hasher.update(&index.to_be_bytes());
hasher.update(&command.encode());
hasher.finalize()
}
/// Serialize entry to bytes (for disk storage).
pub fn encode(&self) -> Vec<u8> {
let cmd_bytes = self.command.encode();
let total_len = 8 + 8 + 4 + 4 + cmd_bytes.len(); // term + index + checksum + cmd_len + cmd
let mut buf = Vec::with_capacity(total_len);
buf.extend(self.term.to_be_bytes());
buf.extend(self.index.to_be_bytes());
buf.extend(self.checksum.to_be_bytes());
buf.extend((cmd_bytes.len() as u32).to_be_bytes());
buf.extend(&cmd_bytes);
buf
}
/// Deserialize entry from bytes.
pub fn decode(data: &[u8]) -> Option<Self> {
if data.len() < 24 {
return None;
}
let term = u64::from_be_bytes(data[0..8].try_into().ok()?);
let index = u64::from_be_bytes(data[8..16].try_into().ok()?);
let checksum = u32::from_be_bytes(data[16..20].try_into().ok()?);
let cmd_len = u32::from_be_bytes(data[20..24].try_into().ok()?) as usize;
if 24 + cmd_len > data.len() {
return None;
}
let command = Command::decode(&data[24..24 + cmd_len])?;
// Verify checksum
let expected = Self::compute_checksum(term, index, &command);
if checksum != expected {
tracing::warn!(
term,
index,
expected,
actual = checksum,
"log entry checksum mismatch"
);
return None;
}
Some(LogEntry { term, index, command, checksum })
}
}
// ── IndexEntry ───────────────────────────────────────────────
/// Maps a log index to its file offset (for random access).
#[derive(Debug, Clone, Copy)]
struct IndexEntry {
index: u64,
file_offset: u64,
entry_size: u32,
}
impl IndexEntry {
const SIZE: usize = 20; // 8 + 8 + 4
fn encode(&self) -> [u8; Self::SIZE] {
let mut buf = [0u8; Self::SIZE];
buf[0..8].copy_from_slice(&self.index.to_be_bytes());
buf[8..16].copy_from_slice(&self.file_offset.to_be_bytes());
buf[16..20].copy_from_slice(&self.entry_size.to_be_bytes());
buf
}
#[allow(dead_code)]
fn decode(data: &[u8; Self::SIZE]) -> Self {
Self {
index: u64::from_be_bytes(data[0..8].try_into().unwrap()),
file_offset: u64::from_be_bytes(data[8..16].try_into().unwrap()),
entry_size: u32::from_be_bytes(data[16..20].try_into().unwrap()),
}
}
}
// ── RaftStorage ──────────────────────────────────────────────
/// Disk persistence layer for the Raft log.
struct RaftStorage {
log_path: PathBuf,
index_path: PathBuf,
}
impl RaftStorage {
fn new(data_dir: &Path) -> Self {
Self {
log_path: data_dir.join("raft-log.dat"),
index_path: data_dir.join("raft-index.dat"),
}
}
/// Append an entry to the log file and update the index.
fn append(&self, entry: &LogEntry) -> GitResult<u64> {
let entry_bytes = entry.encode();
let entry_size = entry_bytes.len() as u32;
// Append to log file
let mut log_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)
.map_err(GitError::Io)?;
let file_offset = log_file.metadata().map_err(GitError::Io)?.len();
log_file.write_all(&entry_bytes).map_err(GitError::Io)?;
log_file.flush().map_err(GitError::Io)?;
// Append to index file
let index_entry = IndexEntry {
index: entry.index,
file_offset,
entry_size,
};
let mut index_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.index_path)
.map_err(GitError::Io)?;
index_file.write_all(&index_entry.encode()).map_err(GitError::Io)?;
index_file.flush().map_err(GitError::Io)?;
Ok(entry.index)
}
/// Load all entries from disk.
fn load_all(&self) -> GitResult<Vec<LogEntry>> {
if !self.log_path.exists() {
return Ok(Vec::new());
}
let log_file = std::fs::File::open(&self.log_path).map_err(GitError::Io)?;
let mut reader = BufReader::new(log_file);
let mut entries = Vec::new();
loop {
// Read term (8 bytes)
let mut term_buf = [0u8; 8];
match reader.read_exact(&mut term_buf) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(GitError::Io(e)),
}
let term = u64::from_be_bytes(term_buf);
// Read index (8 bytes)
let mut index_buf = [0u8; 8];
reader.read_exact(&mut index_buf).map_err(GitError::Io)?;
let index = u64::from_be_bytes(index_buf);
// Read checksum (4 bytes)
let mut checksum_buf = [0u8; 4];
reader.read_exact(&mut checksum_buf).map_err(GitError::Io)?;
let checksum = u32::from_be_bytes(checksum_buf);
// Read command length (4 bytes)
let mut cmd_len_buf = [0u8; 4];
reader.read_exact(&mut cmd_len_buf).map_err(GitError::Io)?;
let cmd_len = u32::from_be_bytes(cmd_len_buf) as usize;
if cmd_len > MAX_ENTRY_SIZE {
tracing::warn!(index, cmd_len, "entry too large, stopping recovery");
break;
}
// Read command bytes
let mut cmd_buf = vec![0u8; cmd_len];
reader.read_exact(&mut cmd_buf).map_err(GitError::Io)?;
// Decode command
match Command::decode(&cmd_buf) {
Some(command) => {
let expected_checksum = LogEntry::compute_checksum(term, index, &command);
if checksum != expected_checksum {
tracing::warn!(
index,
expected = expected_checksum,
actual = checksum,
"checksum mismatch during recovery, stopping"
);
break;
}
entries.push(LogEntry { term, index, command, checksum });
}
None => {
tracing::warn!(index, "failed to decode command during recovery, stopping");
break;
}
}
}
tracing::info!(count = entries.len(), "loaded raft log entries from disk");
Ok(entries)
}
/// Get the total size of the log file.
fn log_file_size(&self) -> u64 {
std::fs::metadata(&self.log_path)
.map(|m| m.len())
.unwrap_or(0)
}
/// Truncate the log file and rebuild the index.
fn truncate_and_rebuild(&self, entries: &[LogEntry]) -> GitResult<()> {
// Write new log file
let log_file = std::fs::File::create(&self.log_path).map_err(GitError::Io)?;
let mut writer = BufWriter::new(log_file);
let mut index_entries = Vec::with_capacity(entries.len());
let mut offset = 0u64;
for entry in entries {
let entry_bytes = entry.encode();
let entry_size = entry_bytes.len() as u32;
index_entries.push(IndexEntry {
index: entry.index,
file_offset: offset,
entry_size,
});
writer.write_all(&entry_bytes).map_err(GitError::Io)?;
offset += entry_size as u64;
}
writer.flush().map_err(GitError::Io)?;
// Write new index file
let index_file = std::fs::File::create(&self.index_path).map_err(GitError::Io)?;
let mut writer = BufWriter::new(index_file);
for ie in &index_entries {
writer.write_all(&ie.encode()).map_err(GitError::Io)?;
}
writer.flush().map_err(GitError::Io)?;
Ok(())
}
}
// ── RaftLog ──────────────────────────────────────────────────
/// Thread-safe Raft log with in-memory storage and disk persistence.
pub struct RaftLog {
entries: Vec<LogEntry>,
commit_index: u64,
last_applied: u64,
/// Atomic next index for CAS-based concurrent append.
next_index: AtomicU64,
storage: RaftStorage,
/// Snapshot storage for log compaction.
snapshot_storage: SnapshotStorage,
/// Last snapshot index (entries before this are discarded).
snapshot_index: u64,
/// Last snapshot term.
snapshot_term: u64,
/// Path for snapshot data.
data_dir: PathBuf,
}
impl RaftLog {
/// Create a new RaftLog, loading existing entries from disk.
pub fn new(data_dir: &Path) -> GitResult<Self> {
std::fs::create_dir_all(data_dir).map_err(GitError::Io)?;
let storage = RaftStorage::new(data_dir);
let snapshot_storage = SnapshotStorage::new(data_dir);
// Load snapshot if exists
let (snapshot_index, snapshot_term) = match snapshot_storage.load()? {
Some(snapshot) => {
tracing::info!(
index = snapshot.last_included_index,
term = snapshot.last_included_term,
"loaded existing raft snapshot"
);
(snapshot.last_included_index, snapshot.last_included_term)
}
None => (0, 0),
};
let entries = storage.load_all()?;
let next_index = entries.last().map(|e| e.index + 1).unwrap_or(snapshot_index + 1);
let last_applied = entries.last().map(|e| e.index).unwrap_or(snapshot_index);
Ok(Self {
entries,
commit_index: snapshot_index,
last_applied,
next_index: AtomicU64::new(next_index),
storage,
snapshot_storage,
snapshot_index,
snapshot_term,
data_dir: data_dir.to_path_buf(),
})
}
/// Get the last log index (0 if empty).
pub fn last_index(&self) -> u64 {
self.entries.last().map(|e| e.index).unwrap_or(0)
}
/// Get the last log term (0 if empty).
pub fn last_term(&self) -> u64 {
self.entries.last().map(|e| e.term).unwrap_or(0)
}
/// Get the current commit index.
pub fn commit_index(&self) -> u64 {
self.commit_index
}
/// Get the last applied index.
pub fn last_applied(&self) -> u64 {
self.last_applied
}
/// Get the next index for a new entry.
pub fn next_index(&self) -> u64 {
self.next_index.load(Ordering::SeqCst)
}
/// CAS-based index reservation for concurrent append.
/// Returns the reserved index. The caller must then call `append_reserved`.
pub fn reserve_index(&self) -> u64 {
self.next_index.fetch_add(1, Ordering::SeqCst)
}
/// Append a pre-reserved entry to the log.
/// The entry must have been assigned an index via `reserve_index`.
pub fn append_reserved(&mut self, entry: LogEntry) -> GitResult<u64> {
// Persist to disk
self.storage.append(&entry)?;
// Add to memory
self.entries.push(entry.clone());
Ok(entry.index)
}
/// Convenience: create and append an entry in one call (non-concurrent).
pub fn append(&mut self, term: u64, command: Command) -> GitResult<u64> {
let index = self.reserve_index();
let entry = LogEntry::new(term, index, command);
self.append_reserved(entry)
}
/// Get an entry by index.
pub fn get(&self, index: u64) -> Option<&LogEntry> {
if self.entries.is_empty() {
return None;
}
let first_index = self.entries[0].index;
if index < first_index {
return None;
}
let offset = (index - first_index) as usize;
self.entries.get(offset)
}
/// Get entries in range [from_index, to_index).
pub fn get_range(&self, from_index: u64, to_index: u64) -> Vec<&LogEntry> {
if self.entries.is_empty() {
return Vec::new();
}
let first_index = self.entries[0].index;
let start = if from_index < first_index {
0
} else {
(from_index - first_index) as usize
};
let end = if to_index < first_index {
0
} else {
((to_index - first_index) as usize).min(self.entries.len())
};
self.entries[start..end].iter().collect()
}
/// Get the term of the entry at the given index.
pub fn term_at(&self, index: u64) -> u64 {
self.get(index).map(|e| e.term).unwrap_or(0)
}
/// Advance commit_index to the given value.
pub fn advance_commit_index(&mut self, new_commit_index: u64) {
if new_commit_index > self.commit_index {
self.commit_index = new_commit_index;
tracing::debug!(commit_index = new_commit_index, "commit index advanced");
}
}
/// Mark entries up to the given index as applied.
pub fn advance_last_applied(&mut self, new_last_applied: u64) {
if new_last_applied > self.last_applied {
self.last_applied = new_last_applied;
}
}
/// Get committed entries that haven't been applied yet.
pub fn unapplied_entries(&self) -> Vec<&LogEntry> {
self.get_range(self.last_applied + 1, self.commit_index + 1)
}
/// Check if the log needs compaction.
pub fn needs_compaction(&self) -> bool {
self.storage.log_file_size() >= LOG_COMPACT_THRESHOLD_BYTES
}
/// Compact the log, keeping entries from `from_index` onwards.
pub fn compact(&mut self, from_index: u64) -> GitResult<()> {
if from_index <= self.entries[0].index {
return Ok(()); // Nothing to compact
}
let keep: Vec<LogEntry> = self.entries.iter()
.filter(|e| e.index >= from_index)
.cloned()
.collect();
if keep.is_empty() {
return Ok(());
}
self.storage.truncate_and_rebuild(&keep)?;
self.entries = keep;
tracing::info!(
from_index,
kept = self.entries.len(),
"raft log compacted"
);
Ok(())
}
/// Truncate the log, removing entries from `from_index` onwards (inclusive).
/// This is used during AppendEntries to resolve log conflicts per the Raft protocol:
/// when a follower detects a term mismatch, it must delete the conflicting entry
/// and all entries that follow.
pub fn truncate_from(&mut self, from_index: u64) -> GitResult<()> {
let keep: Vec<LogEntry> = self.entries.iter()
.filter(|e| e.index < from_index)
.cloned()
.collect();
let removed = self.entries.len() - keep.len();
if removed == 0 {
return Ok(());
}
self.storage.truncate_and_rebuild(&keep)?;
self.entries = keep;
let new_next = self.entries.last().map(|e| e.index + 1).unwrap_or(1);
self.next_index.store(new_next, Ordering::SeqCst);
tracing::info!(
from_index,
removed,
remaining = self.entries.len(),
"raft log truncated"
);
Ok(())
}
/// Get the data directory path (for snapshot storage).
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
/// Get the total number of entries in the log.
pub fn len(&self) -> usize {
self.entries.len()
}
/// Check if the log is empty.
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Create a snapshot of the current state and compact the log.
pub fn create_snapshot(&mut self, repos: HashMap<String, RepoEntry>) -> GitResult<()> {
let snapshot = RaftSnapshot::new(
self.last_applied,
self.term_at(self.last_applied),
repos,
);
self.snapshot_storage.save(&snapshot)?;
self.snapshot_index = snapshot.last_included_index;
self.snapshot_term = snapshot.last_included_term;
// Compact the log: remove entries before the snapshot
self.compact(self.snapshot_index + 1)?;
tracing::info!(
snapshot_index = self.snapshot_index,
snapshot_term = self.snapshot_term,
remaining_entries = self.entries.len(),
"raft snapshot created and log compacted"
);
Ok(())
}
/// Restore state from a snapshot.
pub fn restore_snapshot(&mut self, snapshot: RaftSnapshot) -> GitResult<HashMap<String, RepoEntry>> {
self.snapshot_index = snapshot.last_included_index;
self.snapshot_term = snapshot.last_included_term;
self.commit_index = snapshot.last_included_index;
self.last_applied = snapshot.last_included_index;
self.next_index.store(snapshot.last_included_index + 1, Ordering::SeqCst);
// Clear all entries (they're covered by the snapshot)
self.entries.clear();
self.storage.truncate_and_rebuild(&[])?;
tracing::info!(
snapshot_index = self.snapshot_index,
snapshot_term = self.snapshot_term,
"raft state restored from snapshot"
);
Ok(snapshot.repos)
}
/// Get the snapshot index.
pub fn snapshot_index(&self) -> u64 {
self.snapshot_index
}
/// Get the snapshot term.
pub fn snapshot_term(&self) -> u64 {
self.snapshot_term
}
}