use crate::pb::RepositoryHeader; use ractor::RpcReplyPort; use ractor_cluster::BytesConvertable; use ractor_cluster::RactorClusterMessage; impl BytesConvertable for RepositoryHeader { fn into_bytes(self) -> Vec { prost::Message::encode_to_vec(&self) } fn from_bytes(bytes: Vec) -> Self { prost::Message::decode(bytes.as_slice()).unwrap_or_default() } } pub const ROLE_PRIMARY: &str = "primary"; pub const ROLE_REPLICA: &str = "replica"; #[derive(Debug, Clone)] pub struct RouteDecision { pub found: bool, pub storage_name: String, pub relative_path: String, pub actor_name: String, pub grpc_addr: String, pub role: String, } impl BytesConvertable for RouteDecision { fn into_bytes(self) -> Vec { encode_strings(&[ if self.found { "1" } else { "0" }.to_string(), self.storage_name, self.relative_path, self.actor_name, self.grpc_addr, self.role, ]) } fn from_bytes(bytes: Vec) -> Self { let values = decode_strings(bytes); Self { found: values.first().is_some_and(|v| v == "1"), storage_name: values.get(1).cloned().unwrap_or_default(), relative_path: values.get(2).cloned().unwrap_or_default(), actor_name: values.get(3).cloned().unwrap_or_default(), grpc_addr: values.get(4).cloned().unwrap_or_default(), role: values.get(5).cloned().unwrap_or_default(), } } } #[derive(Debug, Clone)] pub struct NodeHealth { pub storage_name: String, pub repo_count: u64, pub healthy: bool, pub version: String, } impl BytesConvertable for NodeHealth { fn into_bytes(self) -> Vec { encode_strings(&[ self.storage_name, self.repo_count.to_string(), if self.healthy { "1" } else { "0" }.to_string(), self.version, ]) } fn from_bytes(bytes: Vec) -> Self { let values = decode_strings(bytes); Self { storage_name: values.first().cloned().unwrap_or_default(), repo_count: values .get(1) .and_then(|v| v.parse().ok()) .unwrap_or_default(), healthy: values.get(2).is_some_and(|v| v == "1"), version: values.get(3).cloned().unwrap_or_default(), } } } #[derive(Debug, Clone)] pub struct RefUpdateEvent { pub relative_path: String, pub ref_name: String, pub old_oid: String, pub new_oid: String, pub primary_grpc_addr: String, pub primary_storage_name: String, } impl BytesConvertable for RefUpdateEvent { fn into_bytes(self) -> Vec { encode_strings(&[ self.relative_path, self.ref_name, self.old_oid, self.new_oid, self.primary_grpc_addr, self.primary_storage_name, ]) } fn from_bytes(bytes: Vec) -> Self { let values = decode_strings(bytes); Self { relative_path: values.first().cloned().unwrap_or_default(), ref_name: values.get(1).cloned().unwrap_or_default(), old_oid: values.get(2).cloned().unwrap_or_default(), new_oid: values.get(3).cloned().unwrap_or_default(), primary_grpc_addr: values.get(4).cloned().unwrap_or_default(), primary_storage_name: values.get(5).cloned().unwrap_or_default(), } } } #[derive(RactorClusterMessage)] pub enum GitNodeMessage { ScanAndRegister, RegisterRepository(RepositoryHeader), RemoveRepository(RepositoryHeader), RefUpdated(RefUpdateEvent), #[rpc] FindPrimary(RepositoryHeader, RpcReplyPort), #[rpc] FindReplica(RepositoryHeader, RpcReplyPort), #[rpc] ListRepositoryPaths(RpcReplyPort), #[rpc] RepositoryExists(RepositoryHeader, RpcReplyPort), #[rpc] GetNodeHealth(RpcReplyPort), } #[derive(ractor_cluster::RactorMessage)] pub enum RepoActorMessage { UpdateMetadata(RepositoryHeader), } fn encode_strings(values: &[String]) -> Vec { let mut buf = Vec::new(); for value in values { let bytes = value.as_bytes(); buf.extend((bytes.len() as u64).to_be_bytes()); buf.extend(bytes); } buf } // Maximum allowed length for a single string in the message const MAX_STRING_LEN: usize = 10 * 1024 * 1024; // 10MB // Maximum total message size const MAX_TOTAL_SIZE: usize = 50 * 1024 * 1024; // 50MB fn decode_strings(bytes: Vec) -> Vec { let mut values = Vec::new(); let mut offset = 0; // Check total message size if bytes.len() > MAX_TOTAL_SIZE { tracing::warn!( total = bytes.len(), max = MAX_TOTAL_SIZE, "message exceeds maximum size, truncating" ); return values; } while offset + 8 <= bytes.len() { let len_bytes: [u8; 8] = bytes[offset..offset + 8].try_into().unwrap_or([0u8; 8]); let len_u64 = u64::from_be_bytes(len_bytes); // Prevent DoS via extremely large length values if len_u64 > MAX_STRING_LEN as u64 { tracing::warn!( offset, claimed_len = len_u64, max = MAX_STRING_LEN, "string length exceeds maximum, stopping decode" ); break; } let len = len_u64 as usize; offset += 8; // Prevent integer overflow in offset calculation let end_offset = match offset.checked_add(len) { Some(end) => end, None => { tracing::warn!( offset, len, "integer overflow in offset calculation, stopping decode" ); break; } }; if len == 0 || end_offset > bytes.len() { // Invalid length — stop decoding, return what we have so far tracing::warn!( offset, claimed_len = len, total = bytes.len(), "malformed bytes in decode_strings, stopping early" ); break; } values.push(String::from_utf8_lossy(&bytes[offset..end_offset]).into_owned()); offset = end_offset; } values }