From 5f4e9bdfa7d6f70a3b27535df45550f54dfea74c Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Thu, 11 Jun 2026 23:07:36 +0800 Subject: [PATCH] refactor(grpc): bind TCP listener before etcd registration to prevent connection issues - Change tokio-stream dependency to include net feature for TcpListenerStream - Move TCP listener binding before etcd registry initialization in main function - Pass pre-bound TcpListener to gRPC server instead of just SocketAddr - Update gRPC server to use serve_with_incoming with TcpListenerStream - Prevent peers from attempting connections before gRPC server is ready - Ensure proper error handling for TCP binding failures during startup --- Cargo.toml | 2 +- grpc/mod.rs | 3 ++- main.rs | 17 ++++++++++++----- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 310c828..be3e96b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ tonic-prost = "0.14" tonic-health = "0.14.6" url = "2.5" etcd-client = { version = "0.18", features = ["tls"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["net"] } async-nats = "0.49" futures-util = "0.3" utoipa = { version = "5", features = ["uuid","chrono","actix_extras","decimal","macros"]} diff --git a/grpc/mod.rs b/grpc/mod.rs index 61e7131..f2ac16c 100644 --- a/grpc/mod.rs +++ b/grpc/mod.rs @@ -27,6 +27,7 @@ use crate::service::AppService; pub async fn start_grpc_server( addr: SocketAddr, + listener: tokio::net::TcpListener, service: AppService, ) -> Result<(), Box> { let token_svc = auth::TokenGrpcService::new(service.internal_auth.clone()); @@ -60,7 +61,7 @@ pub async fn start_grpc_server( .add_service(VoiceServiceServer::new(cs.voice)) .add_service(StageServiceServer::new(cs.stage)) .add_service(ChannelAuditServiceServer::new(cs.channel_audit)) - .serve(addr) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) .await?; Ok(()) diff --git a/main.rs b/main.rs index bef7268..0bee516 100644 --- a/main.rs +++ b/main.rs @@ -37,6 +37,17 @@ async fn main() -> AppResult<()> { let cache = Arc::new(AppCache::from_config(&config).await?); let storage = AppS3Storage::from_config(&config).await?; + let rpc_host = config.get_env_or::("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?; + let rpc_port = config.get_env_or::("APP_RPC_SELF_PORT", 50050)?; + let rpc_addr: std::net::SocketAddr = format!("{rpc_host}:{rpc_port}").parse() + .map_err(|e| appks::error::AppError::Config(format!("invalid gRPC address: {e}")))?; + + // Bind the TCP listener FIRST so the port is reserved before etcd registration. + // This prevents peers from trying to connect before the gRPC server is ready. + let rpc_listener = tokio::net::TcpListener::bind(rpc_addr) + .await + .map_err(|e| appks::error::AppError::Config(format!("gRPC bind failed on {rpc_addr}: {e}")))?; + let registry = Arc::new(EtcdRegistry::connect(&config).await?); registry.start_discovery().await?; registry @@ -57,13 +68,9 @@ async fn main() -> AppResult<()> { ) .await; - let rpc_host = config.get_env_or::("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?; - let rpc_port = config.get_env_or::("APP_RPC_SELF_PORT", 50050)?; - let rpc_addr: std::net::SocketAddr = format!("{rpc_host}:{rpc_port}").parse() - .map_err(|e| appks::error::AppError::Config(format!("invalid gRPC address: {e}")))?; let grpc_service = service.clone(); tokio::spawn(async move { - if let Err(e) = appks::grpc::start_grpc_server(rpc_addr, grpc_service).await { + if let Err(e) = appks::grpc::start_grpc_server(rpc_addr, rpc_listener, grpc_service).await { tracing::error!(error = %e, "gRPC server failed"); } });