bcd750b905
Add truncate_from() method to RaftLog for removing entries from a given index onwards, as required by the Raft protocol when a follower detects a term mismatch during AppendEntries.
734 lines
24 KiB
Rust
734 lines
24 KiB
Rust
//! Raft log storage with disk persistence and CAS-based concurrent append.
|
|
//!
|
|
//! The log is stored in memory for fast access and persisted to disk for crash recovery.
|
|
//! Each entry has a CRC32 checksum for integrity verification.
|
|
//!
|
|
//! Storage format:
|
|
//! - `raft-log.dat`: Append-only log file containing serialized entries
|
|
//! - `raft-index.dat`: Index file mapping entry index to file offset (for random access)
|
|
|
|
use std::io::{BufReader, BufWriter, Read, Write};
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
|
|
use ractor_cluster::BytesConvertable;
|
|
|
|
use crate::error::{GitError, GitResult};
|
|
use crate::actor::snapshot::{RaftSnapshot, SnapshotStorage};
|
|
use crate::actor::handler::RepoEntry;
|
|
use std::collections::HashMap;
|
|
|
|
/// Protocol version for forward/backward compatibility.
|
|
pub const RAFT_PROTOCOL_VERSION: u32 = 1;
|
|
|
|
/// Maximum log entry size (1MB).
|
|
const MAX_ENTRY_SIZE: usize = 1024 * 1024;
|
|
|
|
/// Log compression threshold (100MB).
|
|
pub const LOG_COMPACT_THRESHOLD_BYTES: u64 = 100 * 1024 * 1024;
|
|
|
|
// ── Command ──────────────────────────────────────────────────
|
|
|
|
/// A Raft command that can be applied to the state machine.
|
|
#[derive(Debug, Clone)]
|
|
pub enum Command {
|
|
RefUpdate {
|
|
relative_path: String,
|
|
ref_name: String,
|
|
old_oid: String,
|
|
new_oid: String,
|
|
},
|
|
RegisterRepo {
|
|
relative_path: String,
|
|
storage_name: String,
|
|
},
|
|
RemoveRepo {
|
|
relative_path: String,
|
|
},
|
|
SetPrimary {
|
|
storage_name: String,
|
|
relative_paths: Vec<String>,
|
|
},
|
|
}
|
|
|
|
impl Command {
|
|
/// Serialize command to bytes.
|
|
pub fn encode(&self) -> Vec<u8> {
|
|
let mut buf = Vec::new();
|
|
match self {
|
|
Command::RefUpdate { relative_path, ref_name, old_oid, new_oid } => {
|
|
buf.push(0); // tag
|
|
encode_strings(&mut buf, &[relative_path, ref_name, old_oid, new_oid]);
|
|
}
|
|
Command::RegisterRepo { relative_path, storage_name } => {
|
|
buf.push(1);
|
|
encode_strings(&mut buf, &[relative_path, storage_name]);
|
|
}
|
|
Command::RemoveRepo { relative_path } => {
|
|
buf.push(2);
|
|
encode_strings(&mut buf, &[relative_path]);
|
|
}
|
|
Command::SetPrimary { storage_name, relative_paths } => {
|
|
buf.push(3);
|
|
encode_string(&mut buf, storage_name);
|
|
buf.extend((relative_paths.len() as u32).to_be_bytes());
|
|
for p in relative_paths {
|
|
encode_string(&mut buf, p);
|
|
}
|
|
}
|
|
}
|
|
buf
|
|
}
|
|
|
|
/// Deserialize command from bytes.
|
|
pub fn decode(data: &[u8]) -> Option<Self> {
|
|
if data.is_empty() {
|
|
return None;
|
|
}
|
|
let tag = data[0];
|
|
let mut offset = 1;
|
|
match tag {
|
|
0 => {
|
|
let (rp, o1) = decode_string(data, offset)?;
|
|
offset = o1;
|
|
let (rn, o2) = decode_string(data, offset)?;
|
|
offset = o2;
|
|
let (oo, o3) = decode_string(data, offset)?;
|
|
offset = o3;
|
|
let (no, _) = decode_string(data, offset)?;
|
|
Some(Command::RefUpdate {
|
|
relative_path: rp,
|
|
ref_name: rn,
|
|
old_oid: oo,
|
|
new_oid: no,
|
|
})
|
|
}
|
|
1 => {
|
|
let (rp, o1) = decode_string(data, offset)?;
|
|
offset = o1;
|
|
let (sn, _) = decode_string(data, offset)?;
|
|
Some(Command::RegisterRepo {
|
|
relative_path: rp,
|
|
storage_name: sn,
|
|
})
|
|
}
|
|
2 => {
|
|
let (rp, _) = decode_string(data, offset)?;
|
|
Some(Command::RemoveRepo { relative_path: rp })
|
|
}
|
|
3 => {
|
|
let (sn, o1) = decode_string(data, offset)?;
|
|
offset = o1;
|
|
if offset + 4 > data.len() {
|
|
return None;
|
|
}
|
|
let count = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize;
|
|
offset += 4;
|
|
let mut paths = Vec::with_capacity(count);
|
|
for _ in 0..count {
|
|
let (p, o) = decode_string(data, offset)?;
|
|
offset = o;
|
|
paths.push(p);
|
|
}
|
|
Some(Command::SetPrimary {
|
|
storage_name: sn,
|
|
relative_paths: paths,
|
|
})
|
|
}
|
|
_ => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl BytesConvertable for Command {
|
|
fn into_bytes(self) -> Vec<u8> {
|
|
self.encode()
|
|
}
|
|
|
|
fn from_bytes(bytes: Vec<u8>) -> Self {
|
|
Self::decode(&bytes).unwrap_or(Command::RemoveRepo {
|
|
relative_path: String::new(),
|
|
})
|
|
}
|
|
}
|
|
|
|
fn encode_string(buf: &mut Vec<u8>, s: &str) {
|
|
let bytes = s.as_bytes();
|
|
buf.extend((bytes.len() as u32).to_be_bytes());
|
|
buf.extend(bytes);
|
|
}
|
|
|
|
fn encode_strings(buf: &mut Vec<u8>, strings: &[&str]) {
|
|
buf.extend((strings.len() as u32).to_be_bytes());
|
|
for s in strings {
|
|
encode_string(buf, s);
|
|
}
|
|
}
|
|
|
|
fn decode_string(data: &[u8], offset: usize) -> Option<(String, usize)> {
|
|
if offset + 4 > data.len() {
|
|
return None;
|
|
}
|
|
let len = u32::from_be_bytes(data[offset..offset + 4].try_into().ok()?) as usize;
|
|
let start = offset + 4;
|
|
if start + len > data.len() {
|
|
return None;
|
|
}
|
|
let s = String::from_utf8_lossy(&data[start..start + len]).into_owned();
|
|
Some((s, start + len))
|
|
}
|
|
|
|
// ── LogEntry ─────────────────────────────────────────────────
|
|
|
|
/// A single Raft log entry.
|
|
#[derive(Debug, Clone)]
|
|
pub struct LogEntry {
|
|
pub term: u64,
|
|
pub index: u64,
|
|
pub command: Command,
|
|
pub checksum: u32,
|
|
}
|
|
|
|
impl LogEntry {
|
|
pub fn new(term: u64, index: u64, command: Command) -> Self {
|
|
let checksum = Self::compute_checksum(term, index, &command);
|
|
Self { term, index, command, checksum }
|
|
}
|
|
|
|
fn compute_checksum(term: u64, index: u64, command: &Command) -> u32 {
|
|
let mut hasher = crc32fast::Hasher::new();
|
|
hasher.update(&term.to_be_bytes());
|
|
hasher.update(&index.to_be_bytes());
|
|
hasher.update(&command.encode());
|
|
hasher.finalize()
|
|
}
|
|
|
|
/// Serialize entry to bytes (for disk storage).
|
|
pub fn encode(&self) -> Vec<u8> {
|
|
let cmd_bytes = self.command.encode();
|
|
let total_len = 8 + 8 + 4 + 4 + cmd_bytes.len(); // term + index + checksum + cmd_len + cmd
|
|
let mut buf = Vec::with_capacity(total_len);
|
|
buf.extend(self.term.to_be_bytes());
|
|
buf.extend(self.index.to_be_bytes());
|
|
buf.extend(self.checksum.to_be_bytes());
|
|
buf.extend((cmd_bytes.len() as u32).to_be_bytes());
|
|
buf.extend(&cmd_bytes);
|
|
buf
|
|
}
|
|
|
|
/// Deserialize entry from bytes.
|
|
pub fn decode(data: &[u8]) -> Option<Self> {
|
|
if data.len() < 24 {
|
|
return None;
|
|
}
|
|
let term = u64::from_be_bytes(data[0..8].try_into().ok()?);
|
|
let index = u64::from_be_bytes(data[8..16].try_into().ok()?);
|
|
let checksum = u32::from_be_bytes(data[16..20].try_into().ok()?);
|
|
let cmd_len = u32::from_be_bytes(data[20..24].try_into().ok()?) as usize;
|
|
if 24 + cmd_len > data.len() {
|
|
return None;
|
|
}
|
|
let command = Command::decode(&data[24..24 + cmd_len])?;
|
|
|
|
// Verify checksum
|
|
let expected = Self::compute_checksum(term, index, &command);
|
|
if checksum != expected {
|
|
tracing::warn!(
|
|
term,
|
|
index,
|
|
expected,
|
|
actual = checksum,
|
|
"log entry checksum mismatch"
|
|
);
|
|
return None;
|
|
}
|
|
|
|
Some(LogEntry { term, index, command, checksum })
|
|
}
|
|
}
|
|
|
|
// ── IndexEntry ───────────────────────────────────────────────
|
|
|
|
/// Maps a log index to its file offset (for random access).
|
|
#[derive(Debug, Clone, Copy)]
|
|
struct IndexEntry {
|
|
index: u64,
|
|
file_offset: u64,
|
|
entry_size: u32,
|
|
}
|
|
|
|
impl IndexEntry {
|
|
const SIZE: usize = 20; // 8 + 8 + 4
|
|
|
|
fn encode(&self) -> [u8; Self::SIZE] {
|
|
let mut buf = [0u8; Self::SIZE];
|
|
buf[0..8].copy_from_slice(&self.index.to_be_bytes());
|
|
buf[8..16].copy_from_slice(&self.file_offset.to_be_bytes());
|
|
buf[16..20].copy_from_slice(&self.entry_size.to_be_bytes());
|
|
buf
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
fn decode(data: &[u8; Self::SIZE]) -> Self {
|
|
Self {
|
|
index: u64::from_be_bytes(data[0..8].try_into().unwrap()),
|
|
file_offset: u64::from_be_bytes(data[8..16].try_into().unwrap()),
|
|
entry_size: u32::from_be_bytes(data[16..20].try_into().unwrap()),
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── RaftStorage ──────────────────────────────────────────────
|
|
|
|
/// Disk persistence layer for the Raft log.
|
|
struct RaftStorage {
|
|
log_path: PathBuf,
|
|
index_path: PathBuf,
|
|
}
|
|
|
|
impl RaftStorage {
|
|
fn new(data_dir: &Path) -> Self {
|
|
Self {
|
|
log_path: data_dir.join("raft-log.dat"),
|
|
index_path: data_dir.join("raft-index.dat"),
|
|
}
|
|
}
|
|
|
|
/// Append an entry to the log file and update the index.
|
|
fn append(&self, entry: &LogEntry) -> GitResult<u64> {
|
|
let entry_bytes = entry.encode();
|
|
let entry_size = entry_bytes.len() as u32;
|
|
|
|
// Append to log file
|
|
let mut log_file = std::fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&self.log_path)
|
|
.map_err(GitError::Io)?;
|
|
let file_offset = log_file.metadata().map_err(GitError::Io)?.len();
|
|
log_file.write_all(&entry_bytes).map_err(GitError::Io)?;
|
|
log_file.flush().map_err(GitError::Io)?;
|
|
|
|
// Append to index file
|
|
let index_entry = IndexEntry {
|
|
index: entry.index,
|
|
file_offset,
|
|
entry_size,
|
|
};
|
|
let mut index_file = std::fs::OpenOptions::new()
|
|
.create(true)
|
|
.append(true)
|
|
.open(&self.index_path)
|
|
.map_err(GitError::Io)?;
|
|
index_file.write_all(&index_entry.encode()).map_err(GitError::Io)?;
|
|
index_file.flush().map_err(GitError::Io)?;
|
|
|
|
Ok(entry.index)
|
|
}
|
|
|
|
/// Load all entries from disk.
|
|
fn load_all(&self) -> GitResult<Vec<LogEntry>> {
|
|
if !self.log_path.exists() {
|
|
return Ok(Vec::new());
|
|
}
|
|
|
|
let log_file = std::fs::File::open(&self.log_path).map_err(GitError::Io)?;
|
|
let mut reader = BufReader::new(log_file);
|
|
let mut entries = Vec::new();
|
|
|
|
loop {
|
|
// Read term (8 bytes)
|
|
let mut term_buf = [0u8; 8];
|
|
match reader.read_exact(&mut term_buf) {
|
|
Ok(()) => {}
|
|
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
|
|
Err(e) => return Err(GitError::Io(e)),
|
|
}
|
|
let term = u64::from_be_bytes(term_buf);
|
|
|
|
// Read index (8 bytes)
|
|
let mut index_buf = [0u8; 8];
|
|
reader.read_exact(&mut index_buf).map_err(GitError::Io)?;
|
|
let index = u64::from_be_bytes(index_buf);
|
|
|
|
// Read checksum (4 bytes)
|
|
let mut checksum_buf = [0u8; 4];
|
|
reader.read_exact(&mut checksum_buf).map_err(GitError::Io)?;
|
|
let checksum = u32::from_be_bytes(checksum_buf);
|
|
|
|
// Read command length (4 bytes)
|
|
let mut cmd_len_buf = [0u8; 4];
|
|
reader.read_exact(&mut cmd_len_buf).map_err(GitError::Io)?;
|
|
let cmd_len = u32::from_be_bytes(cmd_len_buf) as usize;
|
|
|
|
if cmd_len > MAX_ENTRY_SIZE {
|
|
tracing::warn!(index, cmd_len, "entry too large, stopping recovery");
|
|
break;
|
|
}
|
|
|
|
// Read command bytes
|
|
let mut cmd_buf = vec![0u8; cmd_len];
|
|
reader.read_exact(&mut cmd_buf).map_err(GitError::Io)?;
|
|
|
|
// Decode command
|
|
match Command::decode(&cmd_buf) {
|
|
Some(command) => {
|
|
let expected_checksum = LogEntry::compute_checksum(term, index, &command);
|
|
if checksum != expected_checksum {
|
|
tracing::warn!(
|
|
index,
|
|
expected = expected_checksum,
|
|
actual = checksum,
|
|
"checksum mismatch during recovery, stopping"
|
|
);
|
|
break;
|
|
}
|
|
entries.push(LogEntry { term, index, command, checksum });
|
|
}
|
|
None => {
|
|
tracing::warn!(index, "failed to decode command during recovery, stopping");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
tracing::info!(count = entries.len(), "loaded raft log entries from disk");
|
|
Ok(entries)
|
|
}
|
|
|
|
/// Get the total size of the log file.
|
|
fn log_file_size(&self) -> u64 {
|
|
std::fs::metadata(&self.log_path)
|
|
.map(|m| m.len())
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
/// Truncate the log file and rebuild the index.
|
|
fn truncate_and_rebuild(&self, entries: &[LogEntry]) -> GitResult<()> {
|
|
// Write new log file
|
|
let log_file = std::fs::File::create(&self.log_path).map_err(GitError::Io)?;
|
|
let mut writer = BufWriter::new(log_file);
|
|
let mut index_entries = Vec::with_capacity(entries.len());
|
|
let mut offset = 0u64;
|
|
|
|
for entry in entries {
|
|
let entry_bytes = entry.encode();
|
|
let entry_size = entry_bytes.len() as u32;
|
|
index_entries.push(IndexEntry {
|
|
index: entry.index,
|
|
file_offset: offset,
|
|
entry_size,
|
|
});
|
|
writer.write_all(&entry_bytes).map_err(GitError::Io)?;
|
|
offset += entry_size as u64;
|
|
}
|
|
writer.flush().map_err(GitError::Io)?;
|
|
|
|
// Write new index file
|
|
let index_file = std::fs::File::create(&self.index_path).map_err(GitError::Io)?;
|
|
let mut writer = BufWriter::new(index_file);
|
|
for ie in &index_entries {
|
|
writer.write_all(&ie.encode()).map_err(GitError::Io)?;
|
|
}
|
|
writer.flush().map_err(GitError::Io)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
// ── RaftLog ──────────────────────────────────────────────────
|
|
|
|
/// Thread-safe Raft log with in-memory storage and disk persistence.
|
|
pub struct RaftLog {
|
|
entries: Vec<LogEntry>,
|
|
commit_index: u64,
|
|
last_applied: u64,
|
|
/// Atomic next index for CAS-based concurrent append.
|
|
next_index: AtomicU64,
|
|
storage: RaftStorage,
|
|
/// Snapshot storage for log compaction.
|
|
snapshot_storage: SnapshotStorage,
|
|
/// Last snapshot index (entries before this are discarded).
|
|
snapshot_index: u64,
|
|
/// Last snapshot term.
|
|
snapshot_term: u64,
|
|
/// Path for snapshot data.
|
|
data_dir: PathBuf,
|
|
}
|
|
|
|
impl RaftLog {
|
|
/// Create a new RaftLog, loading existing entries from disk.
|
|
pub fn new(data_dir: &Path) -> GitResult<Self> {
|
|
std::fs::create_dir_all(data_dir).map_err(GitError::Io)?;
|
|
let storage = RaftStorage::new(data_dir);
|
|
let snapshot_storage = SnapshotStorage::new(data_dir);
|
|
|
|
// Load snapshot if exists
|
|
let (snapshot_index, snapshot_term) = match snapshot_storage.load()? {
|
|
Some(snapshot) => {
|
|
tracing::info!(
|
|
index = snapshot.last_included_index,
|
|
term = snapshot.last_included_term,
|
|
"loaded existing raft snapshot"
|
|
);
|
|
(snapshot.last_included_index, snapshot.last_included_term)
|
|
}
|
|
None => (0, 0),
|
|
};
|
|
|
|
let entries = storage.load_all()?;
|
|
|
|
let next_index = entries.last().map(|e| e.index + 1).unwrap_or(snapshot_index + 1);
|
|
let last_applied = entries.last().map(|e| e.index).unwrap_or(snapshot_index);
|
|
|
|
Ok(Self {
|
|
entries,
|
|
commit_index: snapshot_index,
|
|
last_applied,
|
|
next_index: AtomicU64::new(next_index),
|
|
storage,
|
|
snapshot_storage,
|
|
snapshot_index,
|
|
snapshot_term,
|
|
data_dir: data_dir.to_path_buf(),
|
|
})
|
|
}
|
|
|
|
/// Get the last log index (0 if empty).
|
|
pub fn last_index(&self) -> u64 {
|
|
self.entries.last().map(|e| e.index).unwrap_or(0)
|
|
}
|
|
|
|
/// Get the last log term (0 if empty).
|
|
pub fn last_term(&self) -> u64 {
|
|
self.entries.last().map(|e| e.term).unwrap_or(0)
|
|
}
|
|
|
|
/// Get the current commit index.
|
|
pub fn commit_index(&self) -> u64 {
|
|
self.commit_index
|
|
}
|
|
|
|
/// Get the last applied index.
|
|
pub fn last_applied(&self) -> u64 {
|
|
self.last_applied
|
|
}
|
|
|
|
/// Get the next index for a new entry.
|
|
pub fn next_index(&self) -> u64 {
|
|
self.next_index.load(Ordering::SeqCst)
|
|
}
|
|
|
|
/// CAS-based index reservation for concurrent append.
|
|
/// Returns the reserved index. The caller must then call `append_reserved`.
|
|
pub fn reserve_index(&self) -> u64 {
|
|
self.next_index.fetch_add(1, Ordering::SeqCst)
|
|
}
|
|
|
|
/// Append a pre-reserved entry to the log.
|
|
/// The entry must have been assigned an index via `reserve_index`.
|
|
pub fn append_reserved(&mut self, entry: LogEntry) -> GitResult<u64> {
|
|
// Persist to disk
|
|
self.storage.append(&entry)?;
|
|
// Add to memory
|
|
self.entries.push(entry.clone());
|
|
Ok(entry.index)
|
|
}
|
|
|
|
/// Convenience: create and append an entry in one call (non-concurrent).
|
|
pub fn append(&mut self, term: u64, command: Command) -> GitResult<u64> {
|
|
let index = self.reserve_index();
|
|
let entry = LogEntry::new(term, index, command);
|
|
self.append_reserved(entry)
|
|
}
|
|
|
|
/// Get an entry by index.
|
|
pub fn get(&self, index: u64) -> Option<&LogEntry> {
|
|
if self.entries.is_empty() {
|
|
return None;
|
|
}
|
|
let first_index = self.entries[0].index;
|
|
if index < first_index {
|
|
return None;
|
|
}
|
|
let offset = (index - first_index) as usize;
|
|
self.entries.get(offset)
|
|
}
|
|
|
|
/// Get entries in range [from_index, to_index).
|
|
pub fn get_range(&self, from_index: u64, to_index: u64) -> Vec<&LogEntry> {
|
|
if self.entries.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
let first_index = self.entries[0].index;
|
|
let start = if from_index < first_index {
|
|
0
|
|
} else {
|
|
(from_index - first_index) as usize
|
|
};
|
|
let end = if to_index < first_index {
|
|
0
|
|
} else {
|
|
((to_index - first_index) as usize).min(self.entries.len())
|
|
};
|
|
self.entries[start..end].iter().collect()
|
|
}
|
|
|
|
/// Get the term of the entry at the given index.
|
|
pub fn term_at(&self, index: u64) -> u64 {
|
|
self.get(index).map(|e| e.term).unwrap_or(0)
|
|
}
|
|
|
|
/// Advance commit_index to the given value.
|
|
pub fn advance_commit_index(&mut self, new_commit_index: u64) {
|
|
if new_commit_index > self.commit_index {
|
|
self.commit_index = new_commit_index;
|
|
tracing::debug!(commit_index = new_commit_index, "commit index advanced");
|
|
}
|
|
}
|
|
|
|
/// Mark entries up to the given index as applied.
|
|
pub fn advance_last_applied(&mut self, new_last_applied: u64) {
|
|
if new_last_applied > self.last_applied {
|
|
self.last_applied = new_last_applied;
|
|
}
|
|
}
|
|
|
|
/// Get committed entries that haven't been applied yet.
|
|
pub fn unapplied_entries(&self) -> Vec<&LogEntry> {
|
|
self.get_range(self.last_applied + 1, self.commit_index + 1)
|
|
}
|
|
|
|
/// Check if the log needs compaction.
|
|
pub fn needs_compaction(&self) -> bool {
|
|
self.storage.log_file_size() >= LOG_COMPACT_THRESHOLD_BYTES
|
|
}
|
|
|
|
/// Compact the log, keeping entries from `from_index` onwards.
|
|
pub fn compact(&mut self, from_index: u64) -> GitResult<()> {
|
|
if from_index <= self.entries[0].index {
|
|
return Ok(()); // Nothing to compact
|
|
}
|
|
|
|
let keep: Vec<LogEntry> = self.entries.iter()
|
|
.filter(|e| e.index >= from_index)
|
|
.cloned()
|
|
.collect();
|
|
|
|
if keep.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
self.storage.truncate_and_rebuild(&keep)?;
|
|
self.entries = keep;
|
|
|
|
tracing::info!(
|
|
from_index,
|
|
kept = self.entries.len(),
|
|
"raft log compacted"
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Truncate the log, removing entries from `from_index` onwards (inclusive).
|
|
/// This is used during AppendEntries to resolve log conflicts per the Raft protocol:
|
|
/// when a follower detects a term mismatch, it must delete the conflicting entry
|
|
/// and all entries that follow.
|
|
pub fn truncate_from(&mut self, from_index: u64) -> GitResult<()> {
|
|
let keep: Vec<LogEntry> = self.entries.iter()
|
|
.filter(|e| e.index < from_index)
|
|
.cloned()
|
|
.collect();
|
|
|
|
let removed = self.entries.len() - keep.len();
|
|
if removed == 0 {
|
|
return Ok(());
|
|
}
|
|
|
|
self.storage.truncate_and_rebuild(&keep)?;
|
|
self.entries = keep;
|
|
|
|
let new_next = self.entries.last().map(|e| e.index + 1).unwrap_or(1);
|
|
self.next_index.store(new_next, Ordering::SeqCst);
|
|
|
|
tracing::info!(
|
|
from_index,
|
|
removed,
|
|
remaining = self.entries.len(),
|
|
"raft log truncated"
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Get the data directory path (for snapshot storage).
|
|
pub fn data_dir(&self) -> &Path {
|
|
&self.data_dir
|
|
}
|
|
|
|
/// Get the total number of entries in the log.
|
|
pub fn len(&self) -> usize {
|
|
self.entries.len()
|
|
}
|
|
|
|
/// Check if the log is empty.
|
|
pub fn is_empty(&self) -> bool {
|
|
self.entries.is_empty()
|
|
}
|
|
|
|
/// Create a snapshot of the current state and compact the log.
|
|
pub fn create_snapshot(&mut self, repos: HashMap<String, RepoEntry>) -> GitResult<()> {
|
|
let snapshot = RaftSnapshot::new(
|
|
self.last_applied,
|
|
self.term_at(self.last_applied),
|
|
repos,
|
|
);
|
|
|
|
self.snapshot_storage.save(&snapshot)?;
|
|
self.snapshot_index = snapshot.last_included_index;
|
|
self.snapshot_term = snapshot.last_included_term;
|
|
|
|
// Compact the log: remove entries before the snapshot
|
|
self.compact(self.snapshot_index + 1)?;
|
|
|
|
tracing::info!(
|
|
snapshot_index = self.snapshot_index,
|
|
snapshot_term = self.snapshot_term,
|
|
remaining_entries = self.entries.len(),
|
|
"raft snapshot created and log compacted"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Restore state from a snapshot.
|
|
pub fn restore_snapshot(&mut self, snapshot: RaftSnapshot) -> GitResult<HashMap<String, RepoEntry>> {
|
|
self.snapshot_index = snapshot.last_included_index;
|
|
self.snapshot_term = snapshot.last_included_term;
|
|
self.commit_index = snapshot.last_included_index;
|
|
self.last_applied = snapshot.last_included_index;
|
|
self.next_index.store(snapshot.last_included_index + 1, Ordering::SeqCst);
|
|
|
|
// Clear all entries (they're covered by the snapshot)
|
|
self.entries.clear();
|
|
self.storage.truncate_and_rebuild(&[])?;
|
|
|
|
tracing::info!(
|
|
snapshot_index = self.snapshot_index,
|
|
snapshot_term = self.snapshot_term,
|
|
"raft state restored from snapshot"
|
|
);
|
|
|
|
Ok(snapshot.repos)
|
|
}
|
|
|
|
/// Get the snapshot index.
|
|
pub fn snapshot_index(&self) -> u64 {
|
|
self.snapshot_index
|
|
}
|
|
|
|
/// Get the snapshot term.
|
|
pub fn snapshot_term(&self) -> u64 {
|
|
self.snapshot_term
|
|
}
|
|
}
|