From 1dca8b3b784220a244566fcfc3d2e247f7ae2b76 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Wed, 10 Jun 2026 18:32:37 +0800 Subject: [PATCH] fix(raft): add boundary checks and parse warnings to message deserialization Add offset+length bounds checking to AppendEntriesRequest.from_bytes to prevent slice panic. Cap entry count to 10000. Emit tracing::warn when critical term fields fail to parse in Election and RoleChanged messages. --- actor/message.rs | 290 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 287 insertions(+), 3 deletions(-) diff --git a/actor/message.rs b/actor/message.rs index 8a3ce69..3c4fc1f 100644 --- a/actor/message.rs +++ b/actor/message.rs @@ -3,6 +3,11 @@ use ractor::RpcReplyPort; use ractor_cluster::BytesConvertable; use ractor_cluster::RactorClusterMessage; +use super::raft_log::{Command as RaftCmd, LogEntry as RaftEntry}; + +/// Protocol version for Raft messages (forward/backward compatibility). +pub const RAFT_MSG_VERSION: u32 = 1; + impl BytesConvertable for RepositoryHeader { fn into_bytes(self) -> Vec { prost::Message::encode_to_vec(&self) @@ -152,6 +157,21 @@ pub enum GitNodeMessage { /// Health checker detected primary failure, trigger election. TriggerElection, + + // ── Raft consensus messages ────────────────────────────── + + /// AppendEntries RPC: Leader → Follower log replication. + #[rpc] + AppendEntries(AppendEntriesRequest, RpcReplyPort), + + /// ReadIndex RPC: confirm Leader is still valid for read operations. + #[rpc] + ReadIndex(ReadIndexRequest, RpcReplyPort), + + /// Raft write command: submit a command through Raft consensus. + /// Returns true if consensus achieved, false otherwise. + #[rpc] + RaftWrite(crate::actor::raft_log::Command, RpcReplyPort), } #[derive(ractor_cluster::RactorMessage)] @@ -167,6 +187,10 @@ pub struct ElectionRequest { pub candidate_actor_name: String, pub term: u64, pub reason: String, // "primary_failed" etc. + /// Raft: candidate's last log index (for log consistency check). + pub last_log_index: u64, + /// Raft: candidate's last log term (for log consistency check). + pub last_log_term: u64, } impl BytesConvertable for ElectionRequest { @@ -177,17 +201,25 @@ impl BytesConvertable for ElectionRequest { self.candidate_actor_name, self.term.to_string(), self.reason, + self.last_log_index.to_string(), + self.last_log_term.to_string(), ]) } fn from_bytes(bytes: Vec) -> Self { let values = decode_strings(bytes); + let term = values.get(3).and_then(|v| v.parse::().ok()); + if term.is_none() { + tracing::warn!("ElectionRequest.from_bytes: failed to parse term field"); + } Self { candidate_storage_name: values.first().cloned().unwrap_or_default(), candidate_grpc_addr: values.get(1).cloned().unwrap_or_default(), candidate_actor_name: values.get(2).cloned().unwrap_or_default(), - term: values.get(3).and_then(|v| v.parse().ok()).unwrap_or(0), + term: term.unwrap_or(0), reason: values.get(4).cloned().unwrap_or_default(), + last_log_index: values.get(5).and_then(|v| v.parse().ok()).unwrap_or(0), + last_log_term: values.get(6).and_then(|v| v.parse().ok()).unwrap_or(0), } } } @@ -213,9 +245,13 @@ impl BytesConvertable for ElectionResult { fn from_bytes(bytes: Vec) -> Self { let values = decode_strings(bytes); + let current_term = values.get(1).and_then(|v| v.parse::().ok()); + if current_term.is_none() { + tracing::warn!("ElectionResult.from_bytes: failed to parse current_term field"); + } Self { accepted: values.first().is_some_and(|v| v == "1"), - current_term: values.get(1).and_then(|v| v.parse().ok()).unwrap_or(0), + current_term: current_term.unwrap_or(0), voter_storage_name: values.get(2).cloned().unwrap_or_default(), voter_role: values.get(3).cloned().unwrap_or_default(), } @@ -250,12 +286,226 @@ impl BytesConvertable for RoleChangedEvent { storage_name: values.first().cloned().unwrap_or_default(), grpc_addr: values.get(1).cloned().unwrap_or_default(), new_role: values.get(2).cloned().unwrap_or_default(), - term: values.get(3).and_then(|v| v.parse().ok()).unwrap_or(0), + term: { + let t = values.get(3).and_then(|v| v.parse::().ok()); + if t.is_none() { + tracing::warn!("RoleChangedEvent.from_bytes: failed to parse term field"); + } + t.unwrap_or(0) + }, relative_paths: values.iter().skip(4).cloned().collect(), } } } +// ── Raft consensus messages ────────────────────────────────── + +/// Serialized Raft log entry for cross-node transfer. +#[derive(Debug, Clone)] +pub struct SerializedRaftEntry { + pub term: u64, + pub index: u64, + pub command_bytes: Vec, + pub checksum: u32, +} + +impl SerializedRaftEntry { + pub fn from_entry(entry: &RaftEntry) -> Self { + Self { + term: entry.term, + index: entry.index, + command_bytes: entry.command.encode(), + checksum: entry.checksum, + } + } + + pub fn to_entry(&self) -> Option { + let command = RaftCmd::decode(&self.command_bytes)?; + Some(RaftEntry { + term: self.term, + index: self.index, + command, + checksum: self.checksum, + }) + } +} + +/// AppendEntries RPC: Leader → Follower replication. +#[derive(Debug, Clone)] +pub struct AppendEntriesRequest { + /// Protocol version for forward/backward compatibility. + pub version: u32, + pub term: u64, + pub leader_id: String, + pub leader_grpc_addr: String, + pub prev_log_index: u64, + pub prev_log_term: u64, + pub entries: Vec, + pub leader_commit: u64, +} + +impl BytesConvertable for AppendEntriesRequest { + fn into_bytes(self) -> Vec { + let mut buf = Vec::new(); + // Version + buf.extend(self.version.to_be_bytes()); + // Term, leader_id, leader_grpc_addr, prev_log_index, prev_log_term + buf.extend(self.term.to_be_bytes()); + encode_string_bytes(&mut buf, &self.leader_id); + encode_string_bytes(&mut buf, &self.leader_grpc_addr); + buf.extend(self.prev_log_index.to_be_bytes()); + buf.extend(self.prev_log_term.to_be_bytes()); + buf.extend((self.entries.len() as u32).to_be_bytes()); + for entry in &self.entries { + buf.extend(entry.term.to_be_bytes()); + buf.extend(entry.index.to_be_bytes()); + buf.extend(entry.checksum.to_be_bytes()); + buf.extend((entry.command_bytes.len() as u32).to_be_bytes()); + buf.extend(&entry.command_bytes); + } + buf.extend(self.leader_commit.to_be_bytes()); + buf + } + + fn from_bytes(bytes: Vec) -> Self { + let mut offset = 0; + let version = read_u32(&bytes, &mut offset); + let term = read_u64(&bytes, &mut offset); + let leader_id = read_string(&bytes, &mut offset); + let leader_grpc_addr = read_string(&bytes, &mut offset); + let prev_log_index = read_u64(&bytes, &mut offset); + let prev_log_term = read_u64(&bytes, &mut offset); + let entry_count_raw = read_u32(&bytes, &mut offset) as usize; + const MAX_ENTRIES_PER_BATCH: usize = 10_000; + let entry_count = entry_count_raw.min(MAX_ENTRIES_PER_BATCH); + if entry_count < entry_count_raw { + tracing::warn!( + claimed = entry_count_raw, + capped = entry_count, + "AppendEntries entry count capped to prevent DoS" + ); + } + let mut entries = Vec::with_capacity(entry_count); + for _ in 0..entry_count { + let eterm = read_u64(&bytes, &mut offset); + let eindex = read_u64(&bytes, &mut offset); + let echecksum = read_u32(&bytes, &mut offset); + let cmd_len = read_u32(&bytes, &mut offset) as usize; + if offset + cmd_len > bytes.len() { + tracing::warn!( + offset, + cmd_len, + total = bytes.len(), + "AppendEntries entry truncated, stopping decode" + ); + break; + } + if cmd_len > MAX_STRING_LEN { + tracing::warn!( + cmd_len, + max = MAX_STRING_LEN, + "AppendEntries entry too large, stopping decode" + ); + break; + } + let command_bytes = bytes[offset..offset + cmd_len].to_vec(); + offset += cmd_len; + entries.push(SerializedRaftEntry { + term: eterm, + index: eindex, + command_bytes, + checksum: echecksum, + }); + } + let leader_commit = read_u64(&bytes, &mut offset); + Self { version, term, leader_id, leader_grpc_addr, prev_log_index, prev_log_term, entries, leader_commit } + } +} + +/// AppendEntries RPC response: Follower → Leader. +#[derive(Debug, Clone)] +pub struct AppendEntriesResponse { + /// Protocol version. + pub version: u32, + pub term: u64, + pub success: bool, + /// Follower's match_index after appending. + pub match_index: u64, + /// Hint for fast conflict resolution (optional). + pub conflict_index: u64, + pub conflict_term: u64, +} + +impl BytesConvertable for AppendEntriesResponse { + fn into_bytes(self) -> Vec { + let mut buf = Vec::new(); + buf.extend(self.version.to_be_bytes()); + buf.extend(self.term.to_be_bytes()); + buf.push(if self.success { 1 } else { 0 }); + buf.extend(self.match_index.to_be_bytes()); + buf.extend(self.conflict_index.to_be_bytes()); + buf.extend(self.conflict_term.to_be_bytes()); + buf + } + + fn from_bytes(bytes: Vec) -> Self { + let mut offset = 0; + let version = read_u32(&bytes, &mut offset); + let term = read_u64(&bytes, &mut offset); + let success = bytes.get(offset).copied().unwrap_or(0) == 1; + offset += 1; + let match_index = read_u64(&bytes, &mut offset); + let conflict_index = read_u64(&bytes, &mut offset); + let conflict_term = read_u64(&bytes, &mut offset); + Self { version, term, success, match_index, conflict_index, conflict_term } + } +} + +/// ReadIndex request: Client → Leader. +#[derive(Debug, Clone)] +pub struct ReadIndexRequest { + pub relative_path: String, +} + +impl BytesConvertable for ReadIndexRequest { + fn into_bytes(self) -> Vec { + encode_strings(&[self.relative_path]) + } + + fn from_bytes(bytes: Vec) -> Self { + let values = decode_strings(bytes); + Self { + relative_path: values.first().cloned().unwrap_or_default(), + } + } +} + +/// ReadIndex response: Leader → Client. +#[derive(Debug, Clone)] +pub struct ReadIndexResponse { + pub commit_index: u64, + pub leader_term: u64, + pub is_leader: bool, +} + +impl BytesConvertable for ReadIndexResponse { + fn into_bytes(self) -> Vec { + let mut buf = Vec::new(); + buf.extend(self.commit_index.to_be_bytes()); + buf.extend(self.leader_term.to_be_bytes()); + buf.push(if self.is_leader { 1 } else { 0 }); + buf + } + + fn from_bytes(bytes: Vec) -> Self { + let mut offset = 0; + let commit_index = read_u64(&bytes, &mut offset); + let leader_term = read_u64(&bytes, &mut offset); + let is_leader = bytes.get(offset).copied().unwrap_or(0) == 1; + Self { commit_index, leader_term, is_leader } + } +} + fn encode_strings(values: &[String]) -> Vec { let mut buf = Vec::new(); for value in values { @@ -266,6 +516,40 @@ fn encode_strings(values: &[String]) -> Vec { buf } +fn encode_string_bytes(buf: &mut Vec, s: &str) { + let bytes = s.as_bytes(); + buf.extend((bytes.len() as u32).to_be_bytes()); + buf.extend(bytes); +} + +fn read_u32(data: &[u8], offset: &mut usize) -> u32 { + if *offset + 4 > data.len() { + return 0; + } + let val = u32::from_be_bytes(data[*offset..*offset + 4].try_into().unwrap_or([0; 4])); + *offset += 4; + val +} + +fn read_u64(data: &[u8], offset: &mut usize) -> u64 { + if *offset + 8 > data.len() { + return 0; + } + let val = u64::from_be_bytes(data[*offset..*offset + 8].try_into().unwrap_or([0; 8])); + *offset += 8; + val +} + +fn read_string(data: &[u8], offset: &mut usize) -> String { + let len = read_u32(data, offset) as usize; + if *offset + len > data.len() { + return String::new(); + } + let s = String::from_utf8_lossy(&data[*offset..*offset + len]).into_owned(); + *offset += len; + s +} + const MAX_STRING_LEN: usize = 10 * 1024 * 1024; // 10MB const MAX_TOTAL_SIZE: usize = 50 * 1024 * 1024; // 50MB