refactor(grpc): bind TCP listener before etcd registration to prevent connection issues

- Change tokio-stream dependency to include net feature for TcpListenerStream
- Move TCP listener binding before etcd registry initialization in main function
- Pass pre-bound TcpListener to gRPC server instead of just SocketAddr
- Update gRPC server to use serve_with_incoming with TcpListenerStream
- Prevent peers from attempting connections before gRPC server is ready
- Ensure proper error handling for TCP binding failures during startup
This commit is contained in:
zhenyi
2026-06-11 23:07:36 +08:00
parent b797e360c0
commit 5f4e9bdfa7
3 changed files with 15 additions and 7 deletions
+1 -1
View File
@@ -49,7 +49,7 @@ tonic-prost = "0.14"
tonic-health = "0.14.6" tonic-health = "0.14.6"
url = "2.5" url = "2.5"
etcd-client = { version = "0.18", features = ["tls"] } etcd-client = { version = "0.18", features = ["tls"] }
tokio-stream = "0.1" tokio-stream = { version = "0.1", features = ["net"] }
async-nats = "0.49" async-nats = "0.49"
futures-util = "0.3" futures-util = "0.3"
utoipa = { version = "5", features = ["uuid","chrono","actix_extras","decimal","macros"]} utoipa = { version = "5", features = ["uuid","chrono","actix_extras","decimal","macros"]}
+2 -1
View File
@@ -27,6 +27,7 @@ use crate::service::AppService;
pub async fn start_grpc_server( pub async fn start_grpc_server(
addr: SocketAddr, addr: SocketAddr,
listener: tokio::net::TcpListener,
service: AppService, service: AppService,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let token_svc = auth::TokenGrpcService::new(service.internal_auth.clone()); let token_svc = auth::TokenGrpcService::new(service.internal_auth.clone());
@@ -60,7 +61,7 @@ pub async fn start_grpc_server(
.add_service(VoiceServiceServer::new(cs.voice)) .add_service(VoiceServiceServer::new(cs.voice))
.add_service(StageServiceServer::new(cs.stage)) .add_service(StageServiceServer::new(cs.stage))
.add_service(ChannelAuditServiceServer::new(cs.channel_audit)) .add_service(ChannelAuditServiceServer::new(cs.channel_audit))
.serve(addr) .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await?; .await?;
Ok(()) Ok(())
+12 -5
View File
@@ -37,6 +37,17 @@ async fn main() -> AppResult<()> {
let cache = Arc::new(AppCache::from_config(&config).await?); let cache = Arc::new(AppCache::from_config(&config).await?);
let storage = AppS3Storage::from_config(&config).await?; let storage = AppS3Storage::from_config(&config).await?;
let rpc_host = config.get_env_or::<String>("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?;
let rpc_port = config.get_env_or::<u16>("APP_RPC_SELF_PORT", 50050)?;
let rpc_addr: std::net::SocketAddr = format!("{rpc_host}:{rpc_port}").parse()
.map_err(|e| appks::error::AppError::Config(format!("invalid gRPC address: {e}")))?;
// Bind the TCP listener FIRST so the port is reserved before etcd registration.
// This prevents peers from trying to connect before the gRPC server is ready.
let rpc_listener = tokio::net::TcpListener::bind(rpc_addr)
.await
.map_err(|e| appks::error::AppError::Config(format!("gRPC bind failed on {rpc_addr}: {e}")))?;
let registry = Arc::new(EtcdRegistry::connect(&config).await?); let registry = Arc::new(EtcdRegistry::connect(&config).await?);
registry.start_discovery().await?; registry.start_discovery().await?;
registry registry
@@ -57,13 +68,9 @@ async fn main() -> AppResult<()> {
) )
.await; .await;
let rpc_host = config.get_env_or::<String>("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?;
let rpc_port = config.get_env_or::<u16>("APP_RPC_SELF_PORT", 50050)?;
let rpc_addr: std::net::SocketAddr = format!("{rpc_host}:{rpc_port}").parse()
.map_err(|e| appks::error::AppError::Config(format!("invalid gRPC address: {e}")))?;
let grpc_service = service.clone(); let grpc_service = service.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = appks::grpc::start_grpc_server(rpc_addr, grpc_service).await { if let Err(e) = appks::grpc::start_grpc_server(rpc_addr, rpc_listener, grpc_service).await {
tracing::error!(error = %e, "gRPC server failed"); tracing::error!(error = %e, "gRPC server failed");
} }
}); });