diff --git a/etcd/register.rs b/etcd/register.rs index f962604..4f39692 100644 --- a/etcd/register.rs +++ b/etcd/register.rs @@ -7,6 +7,7 @@ use tokio_stream::StreamExt; use crate::error::{AppError, AppResult}; use super::EtcdRegistry; +use super::EtcdRegistryInner; use super::types::ServiceInstance; impl EtcdRegistry { @@ -63,51 +64,68 @@ impl EtcdRegistry { tokio::spawn(async move { loop { - let result = { - let mut client = inner.client.lock().await; - client.lease_keep_alive(lease_id).await - }; - - match result { - Ok((_keeper, mut stream)) => { - while let Some(resp) = stream.next().await { - if let Err(e) = resp { - tracing::warn!(lease_id = lease_id, error = %e, "keep-alive stream error"); - break; - } - } - } - Err(e) => { - tracing::warn!(lease_id = lease_id, error = %e, "keep-alive failed"); - } - } - + Self::run_keep_alive_stream(&inner, lease_id).await; tokio::time::sleep(std::time::Duration::from_secs(interval)).await; - - let re_grant = { - let mut client = inner.client.lock().await; - client - .lease_grant(inner.config.etcd_lease_ttl().unwrap_or(15) as i64, None) - .await - }; - - if let Ok(current) = re_grant { - let new_lease = current.id(); - inner.lease_id.store(new_lease, Ordering::SeqCst); - - let instance = ServiceInstance { - addr: inner.config.rpc_self_listen_addr().unwrap_or_default(), - metadata: HashMap::new(), - }; - - if let Ok(value) = serde_json::to_string(&instance) { - let mut client = inner.client.lock().await; - let opts = PutOptions::new().with_lease(new_lease); - let _ = client.put(key.clone(), value, Some(opts)).await; - } - tracing::info!(old = lease_id, new = new_lease, "etcd lease renewed"); - } + Self::renew_lease_and_reregister(&inner, lease_id, &key).await; } }); } } + +impl EtcdRegistry { + async fn run_keep_alive_stream( + inner: &std::sync::Arc, + lease_id: i64, + ) { + let result = { + let mut client = inner.client.lock().await; + client.lease_keep_alive(lease_id).await + }; + + match result { + Ok((_keeper, mut stream)) => { + while let Some(resp) = stream.next().await { + if let Err(e) = resp { + tracing::warn!(lease_id = lease_id, error = %e, "keep-alive stream error"); + break; + } + } + } + Err(e) => { + tracing::warn!(lease_id = lease_id, error = %e, "keep-alive failed"); + } + } + } + + async fn renew_lease_and_reregister( + inner: &std::sync::Arc, + old_lease_id: i64, + key: &str, + ) { + let re_grant = { + let mut client = inner.client.lock().await; + client + .lease_grant(inner.config.etcd_lease_ttl().unwrap_or(15) as i64, None) + .await + }; + + let Ok(current) = re_grant else { + return; + }; + + let new_lease = current.id(); + inner.lease_id.store(new_lease, Ordering::SeqCst); + + let instance = ServiceInstance { + addr: inner.config.rpc_self_listen_addr().unwrap_or_default(), + metadata: HashMap::new(), + }; + + if let Ok(value) = serde_json::to_string(&instance) { + let mut client = inner.client.lock().await; + let opts = PutOptions::new().with_lease(new_lease); + let _ = client.put(key, value, Some(opts)).await; + } + tracing::info!(old = old_lease_id, new = new_lease, "etcd lease renewed"); + } +} diff --git a/service/auth/login.rs b/service/auth/login.rs index 9e2e97c..67354eb 100644 --- a/service/auth/login.rs +++ b/service/auth/login.rs @@ -37,19 +37,17 @@ impl AuthService { } let password = self.auth_rsa_decode(&context, params.password).await?; - let user = match self.auth_find_user_by_username(&login).await { + let user = match self.auth_find_user(&login).await { Ok(user) => user, - Err(_) => match self.auth_find_user_by_email(&login).await { - Ok(user) => user, - Err(_) => { - let _ = Argon2::default().hash_password( - password.as_bytes(), - &argon2::password_hash::SaltString::generate(&mut rand::thread_rng()), - ); - tracing::warn!(username = %login, "Login: user not found"); - return Err(AppError::UserNotFound); - } - }, + Err(_) => { + // Timing attack mitigation: hash a dummy password before returning + let _ = Argon2::default().hash_password( + password.as_bytes(), + &argon2::password_hash::SaltString::generate(&mut rand::thread_rng()), + ); + tracing::warn!(username = %login, "Login: user not found"); + return Err(AppError::UserNotFound); + } }; let row = sqlx::query( @@ -58,7 +56,7 @@ impl AuthService { FROM user_password WHERE user_id = $1", ) .bind(user.id) - .fetch_optional(self.ctx.db.reader()) + .fetch_optional(self.ctx.db.writer()) .await .map_err(AppError::Database)?; @@ -81,11 +79,16 @@ impl AuthService { return Err(AppError::InvalidTwoFactorCode); }; let attempts_key = format!("{}{}", Self::TOTP_ATTEMPTS_PREFIX, totp_session_key); - let attempts = self.ctx.cache.get::(&attempts_key).unwrap_or(0); + let attempts = self + .ctx + .cache + .get_l2_only::(&attempts_key) + .await + .unwrap_or(0); if attempts >= Self::TOTP_MAX_ATTEMPTS { context.remove(Self::TOTP_KEY); - let _ = self.ctx.cache.delete(&totp_session_key); - let _ = self.ctx.cache.delete(&attempts_key); + let _ = self.ctx.cache.delete(&totp_session_key).await; + let _ = self.ctx.cache.delete(&attempts_key).await; return Err(AppError::InvalidTwoFactorCode); } @@ -96,21 +99,22 @@ impl AuthService { let next_attempts = attempts + 1; if next_attempts >= Self::TOTP_MAX_ATTEMPTS { context.remove(Self::TOTP_KEY); - let _ = self.ctx.cache.delete(&totp_session_key); - let _ = self.ctx.cache.delete(&attempts_key); + let _ = self.ctx.cache.delete(&totp_session_key).await; + let _ = self.ctx.cache.delete(&attempts_key).await; } else { self.ctx .cache - .set( + .set_l2_only( &attempts_key, &next_attempts, Some(std::time::Duration::from_secs(Self::TOTP_PENDING_TTL_SECS)), ) + .await .map_err(|e| AppError::InternalServerError(e.to_string()))?; } return Err(AppError::InvalidTwoFactorCode); } - let _ = self.ctx.cache.delete(&attempts_key); + let _ = self.ctx.cache.delete(&attempts_key).await; } else { let totp_session_key = uuid::Uuid::new_v4().to_string(); context @@ -123,6 +127,7 @@ impl AuthService { &user.id, Some(std::time::Duration::from_secs(Self::TOTP_PENDING_TTL_SECS)), ) + .await .map_err(|e| AppError::InternalServerError(e.to_string()))?; tracing::info!(username = %login, "Login 2FA triggered"); return Err(AppError::TwoFactorRequired); @@ -131,8 +136,8 @@ impl AuthService { { context.remove(Self::TOTP_KEY); let attempts_key = format!("{}{}", Self::TOTP_ATTEMPTS_PREFIX, totp_session_key); - let _ = self.ctx.cache.delete(&totp_session_key); - let _ = self.ctx.cache.delete(&attempts_key); + let _ = self.ctx.cache.delete(&totp_session_key).await; + let _ = self.ctx.cache.delete(&attempts_key).await; } sqlx::query("UPDATE \"user\" SET last_login_at = $1, updated_at = $1 WHERE id = $2") @@ -193,4 +198,12 @@ impl AuthService { .map_err(AppError::Database)? .ok_or(AppError::UserNotFound) } + + /// Find a user by username or email (login lookup). + async fn auth_find_user(&self, login: &str) -> Result { + match self.auth_find_user_by_username(login).await { + Ok(user) => Ok(user), + Err(_) => self.auth_find_user_by_email(login).await, + } + } }