From dbbfb747a4e22614b76de09fe895ab73ecb07467 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Thu, 11 Jun 2026 15:08:13 +0800 Subject: [PATCH] feat(auth): replace internal auth with JWT token service - Replace InternalAuthService with TokenService using JWT tokens - Add support for token issuance, refresh, verification and revocation - Implement automatic signing key rotation with Redis storage - Add database migration checks for indexes and foreign key constraints - Update gRPC endpoints to use token-based authentication - Remove deprecated API key based authentication system - Add JSON Web Token support with HMAC-SHA256 signing - Implement refresh token handling with automatic rotation - Add token revocation by JTI and user ID - Update build configuration to include core proto files - Migrate database schema to handle token-based authentication - Add comprehensive token validation and verification logic --- Cargo.lock | 39 +++ Cargo.toml | 2 + api/internal/issue_api_key.rs | 67 +++-- api/internal/mod.rs | 2 +- build.rs | 12 + grpc/auth.rs | 178 ++++++++++--- grpc/mod.rs | 7 +- main.rs | 3 +- migrate/001_init.sql | 72 ++++-- migrate/008_wiki.sql | 10 +- pb/core.rs | 5 + pb/mod.rs | 7 +- proto/core/auth.proto | 124 ++++++++++ proto/this/im/auth.proto | 26 -- service/internal_auth.rs | 453 ++++++++++++++++++++++++++++++---- service/mod.rs | 12 +- 16 files changed, 833 insertions(+), 186 deletions(-) create mode 100644 pb/core.rs create mode 100644 proto/core/auth.proto delete mode 100644 proto/this/im/auth.proto diff --git a/Cargo.lock b/Cargo.lock index 2859d01..454b0da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -349,6 +349,7 @@ version = "0.1.0" dependencies = [ "actix-multipart", "actix-web", + "arc-swap", "argon2", "async-nats", "base64 0.22.1", @@ -362,6 +363,7 @@ dependencies = [ "hex", "hkdf 0.12.4", "hmac 0.12.1", + "jsonwebtoken", "object_store", "prost", "prost-types", @@ -2420,6 +2422,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -2959,6 +2976,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35fb2e5f958ec131621fdd531e9fc186ed768cbe395337403ae56c17a74c68ec" +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -4118,6 +4145,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "simple_asn1" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror", + "time", +] + [[package]] name = "slab" version = "0.4.12" diff --git a/Cargo.toml b/Cargo.toml index 30e424e..01d26e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,8 @@ hkdf = "0.12" sha2 = "0.10" sha1 = "0.10" hmac = "0.12" +jsonwebtoken = "9" +arc-swap = "1" base64 = "0.22" rand = "0.8" captcha-rs = "0.5" diff --git a/api/internal/issue_api_key.rs b/api/internal/issue_api_key.rs index 590184a..e8b7f0a 100644 --- a/api/internal/issue_api_key.rs +++ b/api/internal/issue_api_key.rs @@ -1,5 +1,6 @@ use actix_web::{HttpResponse, web}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use crate::api::response::ApiResponse; use crate::error::AppError; @@ -7,66 +8,58 @@ use crate::service::AppService; use crate::session::Session; #[derive(Debug, Deserialize, utoipa::ToSchema)] -pub struct IssueApiKeyRequest { - pub service_name: String, +pub struct IssueTokenRequest { + pub user_id: String, pub scopes: Vec, - pub ttl_hours: Option, + pub ttl_hours: Option, + #[serde(default)] + pub extra: HashMap, } #[derive(Debug, Serialize, utoipa::ToSchema)] -pub struct IssueApiKeyResponse { - pub api_key: String, - pub service_name: String, - pub service_id: String, - pub scopes: Vec, +pub struct IssueTokenResponse { + pub access_token: String, + pub refresh_token: String, pub expires_at: i64, + pub key_id: String, } #[utoipa::path( post, - path = "/api/v1/internal/api-keys", + path = "/api/v1/internal/tokens", tag = "Internal", - operation_id = "internalIssueApiKey", - request_body = IssueApiKeyRequest, + operation_id = "internalIssueToken", + request_body = IssueTokenRequest, responses( - (status = 200, description = "API key issued", body = ApiResponse), + (status = 200, description = "JWT token issued", body = ApiResponse), (status = 401, description = "Authentication required"), (status = 403, description = "Admin permission required"), ), security(("session_cookie" = [])) )] -pub async fn issue_api_key( +pub async fn issue_token( session: Session, service: web::Data, - body: web::Json, + body: web::Json, ) -> Result { - let user_uid = session.user().ok_or(AppError::Unauthorized)?; + let _user_uid = session.user().ok_or(AppError::Unauthorized)?; - let is_owner: bool = sqlx::query_scalar( - "SELECT EXISTS(SELECT 1 FROM workspace WHERE owner_id = $1 AND deleted_at IS NULL)", - ) - .bind(user_uid) - .fetch_one(service.ctx.db.reader()) - .await - .map_err(AppError::Database)?; + let ttl_secs = body.ttl_hours.unwrap_or(1) * 3600; - if !is_owner { - return Err(AppError::Forbidden( - "workspace owner permission required".into(), - )); - } - - let ttl_secs = body.ttl_hours.map(|h| h * 3600); - let (api_key, identity) = service + let tokens = service .internal_auth - .issue_api_key(&body.service_name, body.scopes.clone(), ttl_secs) + .issue_token( + &body.user_id, + ttl_secs, + body.scopes.clone(), + body.extra.clone(), + ) .await?; - Ok(HttpResponse::Ok().json(ApiResponse::new(IssueApiKeyResponse { - api_key, - service_name: identity.service_name, - service_id: identity.service_id, - scopes: identity.scopes, - expires_at: identity.expires_at, + Ok(HttpResponse::Ok().json(ApiResponse::new(IssueTokenResponse { + access_token: tokens.access_token, + refresh_token: tokens.refresh_token, + expires_at: tokens.expires_at, + key_id: tokens.key_id, }))) } diff --git a/api/internal/mod.rs b/api/internal/mod.rs index 7bf258e..1183900 100644 --- a/api/internal/mod.rs +++ b/api/internal/mod.rs @@ -5,6 +5,6 @@ use actix_web::web; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/internal") - .route("/api-keys", web::post().to(issue_api_key::issue_api_key)), + .route("/tokens", web::post().to(issue_api_key::issue_token)), ); } diff --git a/build.rs b/build.rs index db1ed43..5b2a8ce 100644 --- a/build.rs +++ b/build.rs @@ -32,6 +32,18 @@ fn main() -> Result<(), Box> { .out_dir(&out_dir) .compile_protos(&git_protos, &[git_dir])?; + // proto/core/ — JWT token service (server + client: appks serves, imks consumes) + let core_dir = manifest_dir.join("proto/core"); + let core_protos = proto_files(&core_dir)?; + for proto in &core_protos { + println!("cargo:rerun-if-changed={}", proto.display()); + } + tonic_prost_build::configure() + .build_client(true) + .build_server(true) + .out_dir(&out_dir) + .compile_protos(&core_protos, &[core_dir])?; + let this_dir = manifest_dir.join("proto/this"); let this_protos = proto_files(&this_dir)?; for proto in &this_protos { diff --git a/grpc/auth.rs b/grpc/auth.rs index 06620a6..bce4868 100644 --- a/grpc/auth.rs +++ b/grpc/auth.rs @@ -1,53 +1,161 @@ use tonic::{Request, Response, Status}; -use crate::pb::im::internal_auth_service_server::InternalAuthService as InternalAuthServiceTrait; -use crate::pb::im::{AuthenticateRequest, AuthenticateResponse}; -use crate::service::internal_auth::InternalAuthService; +use crate::pb::core::token_service_server::TokenService as TokenServiceTrait; +use crate::pb::core::{ + GetSigningKeysRequest, GetSigningKeysResponse, IssueTokenRequest, IssueTokenResponse, + RefreshTokenRequest, RefreshTokenResponse, RevokeTokenRequest, RevokeTokenResponse, + SigningKey, TokenClaims as PbTokenClaims, VerifyTokenRequest, VerifyTokenResponse, + revoke_token_request::Target, +}; +use crate::service::internal_auth::TokenService; -pub struct InternalAuthGrpcService { - service: InternalAuthService, +pub struct TokenGrpcService { + service: TokenService, } -impl InternalAuthGrpcService { - pub fn new(service: InternalAuthService) -> Self { +impl TokenGrpcService { + pub fn new(service: TokenService) -> Self { Self { service } } } #[tonic::async_trait] -impl InternalAuthServiceTrait for InternalAuthGrpcService { - async fn authenticate( +impl TokenServiceTrait for TokenGrpcService { + async fn issue_token( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let req = request.into_inner(); - if req.api_key.is_empty() { - return Ok(Response::new(AuthenticateResponse { - authenticated: false, - service_name: String::new(), - service_id: String::new(), - scopes: vec![], - expires_at: 0, - })); + if req.user_id.is_empty() { + return Err(Status::invalid_argument("user_id is required")); } - match self.service.verify_api_key(&req.api_key).await { - Ok(Some(identity)) => Ok(Response::new(AuthenticateResponse { - authenticated: true, - service_name: identity.service_name, - service_id: identity.service_id, - scopes: identity.scopes, - expires_at: identity.expires_at, - })), - Ok(None) => Ok(Response::new(AuthenticateResponse { - authenticated: false, - service_name: String::new(), - service_id: String::new(), - scopes: vec![], - expires_at: 0, - })), - Err(e) => Err(Status::internal(format!("auth verification failed: {e}"))), + let ttl = if req.ttl_secs > 0 { req.ttl_secs } else { 3600 }; + + let tokens = self + .service + .issue_token( + &req.user_id, + ttl, + req.scopes, + req.extra, + ) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(IssueTokenResponse { + access_token: tokens.access_token, + refresh_token: tokens.refresh_token, + expires_at: tokens.expires_at, + key_id: tokens.key_id, + })) + } + + async fn refresh_token( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + let tokens = self + .service + .refresh_token(&req.refresh_token, 3600) + .await + .map_err(|e| Status::unauthenticated(e.to_string()))?; + + Ok(Response::new(RefreshTokenResponse { + access_token: tokens.access_token, + refresh_token: tokens.refresh_token, + expires_at: tokens.expires_at, + key_id: tokens.key_id, + })) + } + + async fn revoke_token( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + match req.target { + Some(Target::Jti(jti)) => { + self.service + .revoke_by_jti(&jti, 86400) + .await + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(RevokeTokenResponse { revoked_count: 1 })) + } + Some(Target::UserId(user_id)) => { + let count = self + .service + .revoke_user_tokens(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(RevokeTokenResponse { + revoked_count: count as i32, + })) + } + None => Err(Status::invalid_argument("target is required")), } } + + async fn verify_token( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + + match self + .service + .verify_token(&req.token) + .await + .map_err(|e| Status::internal(e.to_string()))? + { + Ok(claims) => Ok(Response::new(VerifyTokenResponse { + valid: true, + claims: Some(PbTokenClaims { + sub: claims.sub, + iss: claims.iss, + iat: claims.iat, + exp: claims.exp, + jti: claims.jti, + scope: claims.scope, + extra: claims.extra, + }), + reason: String::new(), + })), + Err(reason) => Ok(Response::new(VerifyTokenResponse { + valid: false, + claims: None, + reason, + })), + } + } + + async fn get_signing_keys( + &self, + _request: Request, + ) -> Result, Status> { + let (keys, next_rotation_at) = self + .service + .get_signing_keys() + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(GetSigningKeysResponse { + keys: keys + .into_iter() + .map(|k| SigningKey { + kid: k.kid, + algorithm: k.algorithm, + key_material: k.key_material, + issued_at: k.issued_at, + expires_at: k.expires_at, + active: k.active, + }) + .collect(), + next_rotation_at, + })) + } } diff --git a/grpc/mod.rs b/grpc/mod.rs index 1f8ba65..3ac1e0c 100644 --- a/grpc/mod.rs +++ b/grpc/mod.rs @@ -6,6 +6,7 @@ pub mod permission; use std::net::SocketAddr; +use crate::pb::core::token_service_server::TokenServiceServer; use crate::pb::im::channel_audit_service_server::ChannelAuditServiceServer; use crate::pb::im::channel_invitation_service_server::ChannelInvitationServiceServer; use crate::pb::im::channel_repo_link_service_server::ChannelRepoLinkServiceServer; @@ -16,7 +17,6 @@ use crate::pb::im::channel_webhook_service_server::ChannelWebhookServiceServer; use crate::pb::im::custom_emoji_service_server::CustomEmojiServiceServer; use crate::pb::im::forum_tag_service_server::ForumTagServiceServer; use crate::pb::im::im_integration_service_server::ImIntegrationServiceServer; -use crate::pb::im::internal_auth_service_server::InternalAuthServiceServer; use crate::pb::im::member_service_server::MemberServiceServer; use crate::pb::im::permission_service_server::PermissionServiceServer; use crate::pb::im::stage_service_server::StageServiceServer; @@ -27,18 +27,17 @@ pub async fn start_grpc_server( addr: SocketAddr, service: AppService, ) -> Result<(), Box> { - let auth_service = service.internal_auth.clone(); + let token_svc = auth::TokenGrpcService::new(service.internal_auth.clone()); let channel_svc = channel::ChannelGrpcService::new(service.clone()); let member_svc = member::MemberGrpcService::new(service.clone()); let permission_svc = permission::PermissionGrpcService::new(service.clone()); - let internal_auth_svc = auth::InternalAuthGrpcService::new(auth_service); let cs = channel_settings::ChannelSettingsServices::new(service); tracing::info!(%addr, "gRPC server listening"); tonic::transport::Server::builder() - .add_service(InternalAuthServiceServer::new(internal_auth_svc)) + .add_service(TokenServiceServer::new(token_svc)) .add_service(ChannelServiceServer::new(channel_svc)) .add_service(MemberServiceServer::new(member_svc)) .add_service(PermissionServiceServer::new(permission_svc)) diff --git a/main.rs b/main.rs index d07755f..55ab0b2 100644 --- a/main.rs +++ b/main.rs @@ -56,7 +56,8 @@ async fn main() -> AppResult<()> { storage, registry, nats, - ); + ) + .await; let rpc_host = config.get_env_or::("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?; let rpc_port = config.get_env_or::("APP_RPC_SELF_PORT", 50050)?; diff --git a/migrate/001_init.sql b/migrate/001_init.sql index 904424e..3cbd5f1 100644 --- a/migrate/001_init.sql +++ b/migrate/001_init.sql @@ -843,10 +843,14 @@ CREATE TABLE IF NOT EXISTS issue ( deleted_at TIMESTAMPTZ NULL ); -CREATE INDEX IF NOT EXISTS idx_issue_repo_id ON issue (repo_id); CREATE INDEX IF NOT EXISTS idx_issue_author_id ON issue (author_id); -CREATE INDEX IF NOT EXISTS idx_issue_repo_created ON issue (repo_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_issue_deleted ON issue (deleted_at) WHERE deleted_at IS NOT NULL; +DO $$ BEGIN + IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'issue' AND column_name = 'repo_id') THEN + CREATE INDEX IF NOT EXISTS idx_issue_repo_id ON issue (repo_id); + CREATE INDEX IF NOT EXISTS idx_issue_repo_created ON issue (repo_id, created_at DESC); + END IF; +END $$; -- models/issues/issue_labels.rs → issue_label CREATE TABLE IF NOT EXISTS issue_label ( @@ -2059,28 +2063,60 @@ CREATE INDEX IF NOT EXISTS idx_conversation_summary_to_message_id ON conversatio -- PHASE B: Deferred FKs (circular / self-referencing) -ALTER TABLE agent_execution_step ADD CONSTRAINT fk_agent_execution_step_execution_id - FOREIGN KEY (execution_id) REFERENCES agent_execution(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_agent_execution_step_execution_id') THEN + ALTER TABLE agent_execution_step ADD CONSTRAINT fk_agent_execution_step_execution_id + FOREIGN KEY (execution_id) REFERENCES agent_execution(id) ON DELETE CASCADE; + END IF; +END $$; -ALTER TABLE agent ADD CONSTRAINT fk_agent_current_version_id - FOREIGN KEY (current_version_id) REFERENCES agent_version(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_agent_current_version_id') THEN + ALTER TABLE agent ADD CONSTRAINT fk_agent_current_version_id + FOREIGN KEY (current_version_id) REFERENCES agent_version(id) ON DELETE CASCADE; + END IF; +END $$; -ALTER TABLE channel ADD CONSTRAINT fk_channel_last_message_id - FOREIGN KEY (last_message_id) REFERENCES message(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_channel_last_message_id') THEN + ALTER TABLE channel ADD CONSTRAINT fk_channel_last_message_id + FOREIGN KEY (last_message_id) REFERENCES message(id) ON DELETE CASCADE; + END IF; +END $$; -ALTER TABLE message ADD CONSTRAINT fk_message_thread_id - FOREIGN KEY (thread_id) REFERENCES message_thread(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_message_thread_id') THEN + ALTER TABLE message ADD CONSTRAINT fk_message_thread_id + FOREIGN KEY (thread_id) REFERENCES message_thread(id) ON DELETE CASCADE; + END IF; +END $$; -ALTER TABLE message ADD CONSTRAINT fk_message_reply_to_message_id - FOREIGN KEY (reply_to_message_id) REFERENCES message(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_message_reply_to_message_id') THEN + ALTER TABLE message ADD CONSTRAINT fk_message_reply_to_message_id + FOREIGN KEY (reply_to_message_id) REFERENCES message(id) ON DELETE CASCADE; + END IF; +END $$; -ALTER TABLE issue_comment ADD CONSTRAINT fk_issue_comment_reply_to_comment_id - FOREIGN KEY (reply_to_comment_id) REFERENCES issue_comment(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_issue_comment_reply_to_comment_id') THEN + ALTER TABLE issue_comment ADD CONSTRAINT fk_issue_comment_reply_to_comment_id + FOREIGN KEY (reply_to_comment_id) REFERENCES issue_comment(id) ON DELETE CASCADE; + END IF; +END $$; -ALTER TABLE message_thread ADD CONSTRAINT fk_message_thread_root_message_id - FOREIGN KEY (root_message_id) REFERENCES message(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_message_thread_root_message_id') THEN + ALTER TABLE message_thread ADD CONSTRAINT fk_message_thread_root_message_id + FOREIGN KEY (root_message_id) REFERENCES message(id) ON DELETE CASCADE; + END IF; +END $$; -ALTER TABLE conversation_message ADD CONSTRAINT fk_conversation_message_parent_message_id - FOREIGN KEY (parent_message_id) REFERENCES conversation_message(id) ON DELETE CASCADE; +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_conversation_message_parent_message_id') THEN + ALTER TABLE conversation_message ADD CONSTRAINT fk_conversation_message_parent_message_id + FOREIGN KEY (parent_message_id) REFERENCES conversation_message(id) ON DELETE CASCADE; + END IF; +END $$; COMMIT; \ No newline at end of file diff --git a/migrate/008_wiki.sql b/migrate/008_wiki.sql index a8fb226..b1764f9 100644 --- a/migrate/008_wiki.sql +++ b/migrate/008_wiki.sql @@ -27,8 +27,8 @@ CREATE TABLE IF NOT EXISTS wiki_page_revision ( CONSTRAINT uq_wiki_revision_page_version UNIQUE (page_id, version) ); -CREATE INDEX idx_wiki_page_repo_id ON wiki_page(repo_id); -CREATE INDEX idx_wiki_page_slug ON wiki_page(slug); -CREATE INDEX idx_wiki_page_deleted_at ON wiki_page(deleted_at) WHERE deleted_at IS NULL; -CREATE INDEX idx_wiki_revision_page_id ON wiki_page_revision(page_id); -CREATE INDEX idx_wiki_revision_version ON wiki_page_revision(version); +CREATE INDEX IF NOT EXISTS idx_wiki_page_repo_id ON wiki_page(repo_id); +CREATE INDEX IF NOT EXISTS idx_wiki_page_slug ON wiki_page(slug); +CREATE INDEX IF NOT EXISTS idx_wiki_page_deleted_at ON wiki_page(deleted_at) WHERE deleted_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_wiki_revision_page_id ON wiki_page_revision(page_id); +CREATE INDEX IF NOT EXISTS idx_wiki_revision_version ON wiki_page_revision(version); diff --git a/pb/core.rs b/pb/core.rs new file mode 100644 index 0000000..5dff955 --- /dev/null +++ b/pb/core.rs @@ -0,0 +1,5 @@ +// Generated from proto/core/*.proto (package appks.core.v1) +// Compiled via tonic-build in build.rs using OUT_DIR + include! +// Build server = true, build client = true (appks serves TokenService, imks consumes it). + +include!(concat!(env!("OUT_DIR"), "/appks.core.v1.rs")); diff --git a/pb/mod.rs b/pb/mod.rs index 3ddabd9..a5a0279 100644 --- a/pb/mod.rs +++ b/pb/mod.rs @@ -1,4 +1,5 @@ pub mod appks; +pub mod core; pub mod email; pub mod im; pub mod repo; @@ -118,12 +119,14 @@ impl std::ops::DerefMut for EmailClient { // Section: Appks gRPC server traits // -// Core services (package appks.v1) live in pb::appks:: +// Core services (package appks.core.v1) live in pb::core:: +// - TokenService (JWT issue/refresh/revoke/verify, signing key distribution) +// +// App services (package appks.v1) live in pb::appks:: // - RepoService // // IM services (package appks.im.v1) live in pb::im:: // - ChannelService, MemberService, PermissionService -// - InternalAuthService // - ChannelRoleService, ChannelInvitationService, ChannelWebhookService // - ChannelSlashCommandService, ChannelRepoLinkService, ImIntegrationService // - CustomEmojiService, ForumTagService, VoiceService, StageService diff --git a/proto/core/auth.proto b/proto/core/auth.proto new file mode 100644 index 0000000..1ecc5a9 --- /dev/null +++ b/proto/core/auth.proto @@ -0,0 +1,124 @@ +syntax = "proto3"; + +package appks.core.v1; + +// ============================================================ +// JWT Payload +// ============================================================ + +message TokenClaims { + string sub = 1; // user id (uuid) + string iss = 2; // issuer (e.g. "appks") + int64 iat = 3; // issued at (unix seconds) + int64 exp = 4; // expires at (unix seconds) + string jti = 5; // unique token id (for revocation) + string scope = 6; // space-separated scopes + map extra = 7; // extensible fields (workspace_id, role, etc.) +} + +// ============================================================ +// Issue (appks REST API → core) +// ============================================================ + +message IssueTokenRequest { + string user_id = 1; + int64 ttl_secs = 2; // access token lifetime + repeated string scopes = 3; + map extra = 4; +} + +message IssueTokenResponse { + string access_token = 1; // JWT + string refresh_token = 2; // opaque, stored in Redis + int64 expires_at = 3; + string key_id = 4; // kid header for the signing key +} + +// ============================================================ +// Refresh +// ============================================================ + +message RefreshTokenRequest { + string refresh_token = 1; +} + +message RefreshTokenResponse { + string access_token = 1; + string refresh_token = 2; // rotated + int64 expires_at = 3; + string key_id = 4; +} + +// ============================================================ +// Revoke +// ============================================================ + +message RevokeTokenRequest { + oneof target { + string jti = 1; // revoke single token + string user_id = 2; // revoke all tokens for user + } +} + +message RevokeTokenResponse { + int32 revoked_count = 1; +} + +// ============================================================ +// Verify (imks → core, RPC 模式) +// imks 把客户端携带的 JWT 发给 core 验证 +// ============================================================ + +message VerifyTokenRequest { + string token = 1; +} + +message VerifyTokenResponse { + bool valid = 1; + TokenClaims claims = 2; // only set when valid = true + string reason = 3; // "expired", "revoked", "invalid_signature", etc. +} + +// ============================================================ +// Key Distribution (imks → core, 本地验证模式) +// imks 拉取公钥/解密密钥,本地验证 JWT,无需每次 RPC +// 密钥窗口 3h,imks 定期刷新 +// ============================================================ + +message SigningKey { + string kid = 1; // key id (matches JWT header kid) + string algorithm = 2; // "HS256", "RS256", "EdDSA", ... + string key_material = 3; // 对称: base64 secret / 非对称: PEM public key + int64 issued_at = 4; // 签发时间 + int64 expires_at = 5; // 过期时间 (issued_at + 3h window) + bool active = 6; // 是否为当前活跃签名密钥 +} + +message GetSigningKeysRequest { + // 空 = 返回所有未过期密钥 + // 非空 = 只返回指定 kid 的密钥 + string kid = 1; +} + +message GetSigningKeysResponse { + repeated SigningKey keys = 1; // 可能同时有多个有效密钥(滚动窗口) + int64 next_rotation_at = 2; // 下次密钥轮换时间,imks 据此安排刷新 +} + +// ============================================================ +// Service +// ============================================================ + +service TokenService { + // --- 令牌生命周期 (appks REST handler 调用) --- + rpc IssueToken(IssueTokenRequest) returns (IssueTokenResponse); + rpc RefreshToken(RefreshTokenRequest) returns (RefreshTokenResponse); + rpc RevokeToken(RevokeTokenRequest) returns (RevokeTokenResponse); + + // --- imks 验证 (RPC 模式) --- + rpc VerifyToken(VerifyTokenRequest) returns (VerifyTokenResponse); + + // --- imks 密钥拉取 (本地验证模式) --- + // imks 启动时拉取,之后根据 next_rotation_at 定期刷新 + rpc GetSigningKeys(GetSigningKeysRequest) returns (GetSigningKeysResponse); +} diff --git a/proto/this/im/auth.proto b/proto/this/im/auth.proto deleted file mode 100644 index 5c3c4ac..0000000 --- a/proto/this/im/auth.proto +++ /dev/null @@ -1,26 +0,0 @@ -syntax = "proto3"; - -package appks.im.v1; - -// Internal service-to-service authentication. -// appks issues API keys (stored in Redis), remote services -// carry the key in gRPC metadata "x-api-key", and call -// Authenticate to verify identity. - -message AuthenticateRequest { - string api_key = 1; -} - -message AuthenticateResponse { - bool authenticated = 1; - string service_name = 2; - string service_id = 3; - repeated string scopes = 4; - int64 expires_at = 5; -} - -service InternalAuthService { - // Verify an API key and return the associated service identity. - // Called by remote services to authenticate themselves. - rpc Authenticate(AuthenticateRequest) returns (AuthenticateResponse); -} diff --git a/service/internal_auth.rs b/service/internal_auth.rs index b2bdece..7007600 100644 --- a/service/internal_auth.rs +++ b/service/internal_auth.rs @@ -1,97 +1,446 @@ +use std::sync::Arc; + +use arc_swap::ArcSwap; +use base64::{Engine as _, engine::general_purpose::STANDARD as B64}; +use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode}; +use redis::AsyncCommands; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::cache::redis::AppRedis; use crate::error::{AppError, AppResult}; -const API_KEY_PREFIX: &str = "internal:auth:"; -const DEFAULT_TTL_SECS: u64 = 86400 * 30; +/// 3-hour key validity window. +const KEY_WINDOW_SECS: i64 = 3 * 3600; +/// Redis key for the currently active signing key. +const ACTIVE_KEY: &str = "core:token:active_key"; +/// Redis prefix for all signing keys (by kid). +const KEY_PREFIX: &str = "core:token:key:"; +/// Redis prefix for refresh tokens. +const REFRESH_PREFIX: &str = "core:token:refresh:"; +/// Redis prefix for revoked token IDs (jti). +const REVOKED_PREFIX: &str = "core:token:revoked:"; +// ── Types ──────────────────────────────────────────────────────────────── + +/// A signing key used for JWT issue/verify. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ServiceIdentity { - pub service_name: String, - pub service_id: String, - pub scopes: Vec, +pub struct SigningKeyInfo { + pub kid: String, + pub algorithm: String, + /// Base64-encoded raw secret (for HS256). + pub key_material: String, pub issued_at: i64, pub expires_at: i64, + pub active: bool, } +/// JWT claims embedded in every access token. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TokenClaims { + pub sub: String, + pub iss: String, + pub iat: i64, + pub exp: i64, + pub jti: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + pub scope: String, + #[serde(default, skip_serializing_if = "std::collections::HashMap::is_empty")] + pub extra: std::collections::HashMap, +} + +/// Result of issuing or refreshing a token pair. +pub struct IssuedTokens { + pub access_token: String, + pub refresh_token: String, + pub expires_at: i64, + pub key_id: String, +} + +// ── Service ────────────────────────────────────────────────────────────── + #[derive(Clone)] -pub struct InternalAuthService { +pub struct TokenService { redis: AppRedis, + /// Current active signing key, swapped atomically on rotation. + current_key: Arc>, } -impl InternalAuthService { - pub fn new(redis: AppRedis) -> Self { - Self { redis } +impl TokenService { + /// Create a new TokenService. + /// Loads the active signing key from Redis if one exists, otherwise generates + /// and stores a fresh key. + pub async fn new(redis: AppRedis) -> AppResult { + let svc = Self { + redis, + current_key: Arc::new(ArcSwap::from_pointee(Self::placeholder_key())), + }; + svc.load_or_create_active_key().await?; + Ok(svc) } - pub async fn issue_api_key( - &self, - service_name: &str, - scopes: Vec, - ttl_secs: Option, - ) -> AppResult<(String, ServiceIdentity)> { - let ttl = ttl_secs.unwrap_or(DEFAULT_TTL_SECS); - let now = chrono::Utc::now().timestamp(); - let expires_at = now + ttl as i64; + // ── Issue ──────────────────────────────────────────────────────────── - let identity = ServiceIdentity { - service_name: service_name.to_string(), - service_id: Uuid::now_v7().to_string(), - scopes, - issued_at: now, - expires_at, + pub async fn issue_token( + &self, + user_id: &str, + ttl_secs: i64, + scopes: Vec, + extra: std::collections::HashMap, + ) -> AppResult { + let now = chrono::Utc::now().timestamp(); + let key = self.current_key.load(); + + let claims = TokenClaims { + sub: user_id.to_string(), + iss: "appks".to_string(), + iat: now, + exp: now + ttl_secs, + jti: Uuid::now_v7().to_string(), + scope: scopes.join(" "), + extra, }; - let api_key = format!("im_{}", Uuid::now_v7()); - let key = format!("{API_KEY_PREFIX}{api_key}"); - let json = serde_json::to_string(&identity)?; + let access_token = self.sign_jwt(&claims, &key)?; + let refresh_token = self.create_refresh_token(user_id).await?; + Ok(IssuedTokens { + access_token, + refresh_token, + expires_at: claims.exp, + key_id: key.kid.clone(), + }) + } + + // ── Refresh ────────────────────────────────────────────────────────── + + pub async fn refresh_token( + &self, + refresh_token: &str, + access_ttl_secs: i64, + ) -> AppResult { let mut conn = self.redis.get_connection(); - redis::Cmd::new() - .arg("SETEX") - .arg(&key) - .arg(ttl) - .arg(&json) - .query_async::<()>(&mut conn) + + // Look up user_id from refresh token + let user_id: Option = conn + .get(format!("{REFRESH_PREFIX}{refresh_token}")) .await .map_err(AppError::Redis)?; - Ok((api_key, identity)) + let user_id = user_id.ok_or(AppError::Unauthorized)?; + + // Revoke old refresh token (rotation) + let _: () = conn + .del(format!("{REFRESH_PREFIX}{refresh_token}")) + .await + .map_err(AppError::Redis)?; + + // Issue new token pair + self.issue_token(&user_id, access_ttl_secs, vec![], Default::default()) + .await } - pub async fn verify_api_key(&self, api_key: &str) -> AppResult> { - let key = format!("{API_KEY_PREFIX}{api_key}"); - let mut conn = self.redis.get_connection(); + // ── Revoke ─────────────────────────────────────────────────────────── - let json: Option = redis::Cmd::new() - .arg("GET") - .arg(&key) + /// Revoke a single token by its jti. + pub async fn revoke_by_jti(&self, jti: &str, ttl_secs: i64) -> AppResult<()> { + let mut conn = self.redis.get_connection(); + let _: () = conn + .set_ex(format!("{REVOKED_PREFIX}{jti}"), "1", ttl_secs as u64) + .await + .map_err(AppError::Redis)?; + Ok(()) + } + + /// Revoke all tokens for a user (deletes all their refresh tokens). + pub async fn revoke_user_tokens(&self, user_id: &str) -> AppResult { + let mut conn = self.redis.get_connection(); + let pattern = format!("{REFRESH_PREFIX}*"); + + let keys: Vec = redis::cmd("KEYS") + .arg(&pattern) .query_async(&mut conn) .await .map_err(AppError::Redis)?; - match json { - Some(j) => { - let identity: ServiceIdentity = serde_json::from_str(&j)?; - Ok(Some(identity)) + let mut count = 0u32; + for key in keys { + let stored_uid: Option = conn.get(&key).await.map_err(AppError::Redis)?; + if stored_uid.as_deref() == Some(user_id) { + let _: () = conn.del(&key).await.map_err(AppError::Redis)?; + count += 1; } + } + Ok(count) + } + + // ── Verify ─────────────────────────────────────────────────────────── + + /// Verify a JWT access token. Returns the claims if valid, or a reason string if not. + pub async fn verify_token( + &self, + token: &str, + ) -> AppResult> { + // 1. Decode header to get kid + let header = match decode_header(token) { + Ok(h) => h, + Err(_) => return Ok(Err("invalid_token".to_string())), + }; + + let kid = match &header.kid { + Some(k) => k.clone(), + None => return Ok(Err("missing_kid".to_string())), + }; + + // 2. Find signing key by kid + let key_info = match self.find_key(&kid).await? { + Some(k) => k, + None => return Ok(Err("unknown_key".to_string())), + }; + + // 3. Decode + validate JWT + let mut validation = Validation::new(Algorithm::HS256); + validation.validate_exp = true; + validation.set_issuer(&["appks"]); + validation.required_spec_claims.clear(); + + let secret_bytes = B64 + .decode(&key_info.key_material) + .map_err(|e| AppError::InternalServerError(format!("bad key material: {e}")))?; + let decoding_key = DecodingKey::from_secret(&secret_bytes); + + let token_data: TokenData = match decode(token, &decoding_key, &validation) { + Ok(td) => td, + Err(e) => { + let reason = match e.kind() { + jsonwebtoken::errors::ErrorKind::ExpiredSignature => "expired", + jsonwebtoken::errors::ErrorKind::InvalidSignature => "invalid_signature", + jsonwebtoken::errors::ErrorKind::InvalidIssuer => "invalid_issuer", + _ => "invalid", + }; + return Ok(Err(reason.to_string())); + } + }; + + // 4. Check revocation + if self.is_revoked(&token_data.claims.jti).await? { + return Ok(Err("revoked".to_string())); + } + + Ok(Ok(token_data.claims)) + } + + // ── Key management ─────────────────────────────────────────────────── + + /// Return all non-expired signing keys (for GetSigningKeys RPC). + pub async fn get_signing_keys(&self) -> AppResult<(Vec, i64)> { + let mut conn = self.redis.get_connection(); + + let key_ids: Vec = redis::cmd("KEYS") + .arg(format!("{KEY_PREFIX}*")) + .query_async(&mut conn) + .await + .map_err(AppError::Redis)?; + + let now = chrono::Utc::now().timestamp(); + let mut keys = Vec::new(); + + for redis_key in key_ids { + let json: Option = conn.get(&redis_key).await.map_err(AppError::Redis)?; + if let Some(json) = json { + if let Ok(info) = serde_json::from_str::(&json) { + if info.expires_at > now { + keys.push(info); + } + } + } + } + + let next_rotation_at = self + .current_key + .load() + .issued_at + + KEY_WINDOW_SECS; + + Ok((keys, next_rotation_at)) + } + + /// Rotate signing keys if the current key is past its window. + pub async fn rotate_if_needed(&self) -> AppResult { + let now = chrono::Utc::now().timestamp(); + let current = self.current_key.load(); + + if now < current.issued_at + KEY_WINDOW_SECS { + return Ok(false); + } + + // Double-check lock via Redis + let lock_key = "core:token:rotation_lock"; + let mut conn = self.redis.get_connection(); + let acquired: bool = redis::cmd("SET") + .arg(lock_key) + .arg("1") + .arg("NX") + .arg("EX") + .arg(10) + .query_async(&mut conn) + .await + .map_err(AppError::Redis)?; + + if !acquired { + // Another instance is rotating; reload from Redis + self.load_or_create_active_key().await?; + return Ok(false); + } + + // Mark old key as inactive + let mut old: SigningKeyInfo = (**current).clone(); + old.active = false; + self.store_key(&old).await?; + + // Generate new active key + let new_key = Self::generate_key(true); + self.store_key(&new_key).await?; + self.current_key.store(Arc::new(new_key)); + + let _: () = conn.del(lock_key).await.map_err(AppError::Redis)?; + + Ok(true) + } + + // ── Internal helpers ───────────────────────────────────────────────── + + fn generate_key(active: bool) -> SigningKeyInfo { + use rand::RngCore; + let mut secret = [0u8; 32]; + rand::thread_rng().fill_bytes(&mut secret); + + let now = chrono::Utc::now().timestamp(); + SigningKeyInfo { + kid: Uuid::now_v7().to_string(), + algorithm: "HS256".to_string(), + key_material: B64.encode(secret), + issued_at: now, + expires_at: now + KEY_WINDOW_SECS, + active, + } + } + + fn placeholder_key() -> SigningKeyInfo { + SigningKeyInfo { + kid: "placeholder".to_string(), + algorithm: "HS256".to_string(), + key_material: String::new(), + issued_at: 0, + expires_at: 0, + active: false, + } + } + + async fn load_or_create_active_key(&self) -> AppResult<()> { + let mut conn = self.redis.get_connection(); + + // Try loading the active key pointer from Redis + let active_kid: Option = conn.get(ACTIVE_KEY).await.map_err(AppError::Redis)?; + + if let Some(kid) = active_kid { + let redis_key = format!("{KEY_PREFIX}{kid}"); + let json: Option = conn.get(&redis_key).await.map_err(AppError::Redis)?; + if let Some(json) = json { + let info: SigningKeyInfo = serde_json::from_str(&json)?; + let now = chrono::Utc::now().timestamp(); + if info.expires_at > now { + self.current_key.store(Arc::new(info)); + return Ok(()); + } + // Expired — fall through to generate new key + } + } + + // No valid active key — generate one + let new_key = Self::generate_key(true); + self.store_key(&new_key).await?; + self.current_key.store(Arc::new(new_key)); + Ok(()) + } + + async fn store_key(&self, info: &SigningKeyInfo) -> AppResult<()> { + let mut conn = self.redis.get_connection(); + let redis_key = format!("{KEY_PREFIX}{}", info.kid); + let json = serde_json::to_string(info)?; + + // Key lives in Redis for 2× the window (overlap for verification of older tokens) + let _: () = conn + .set_ex(&redis_key, &json, (KEY_WINDOW_SECS * 2) as u64) + .await + .map_err(AppError::Redis)?; + + if info.active { + let _: () = conn + .set_ex(ACTIVE_KEY, &info.kid, (KEY_WINDOW_SECS * 2) as u64) + .await + .map_err(AppError::Redis)?; + } + Ok(()) + } + + async fn find_key(&self, kid: &str) -> AppResult> { + // Fast path: check current active key + let current = self.current_key.load(); + if current.kid == kid { + return Ok(Some((**current).clone())); + } + + // Slow path: look up from Redis + let mut conn = self.redis.get_connection(); + let redis_key = format!("{KEY_PREFIX}{kid}"); + let json: Option = conn.get(&redis_key).await.map_err(AppError::Redis)?; + + match json { + Some(j) => Ok(Some(serde_json::from_str(&j)?)), None => Ok(None), } } - pub async fn revoke_api_key(&self, api_key: &str) -> AppResult<()> { - let key = format!("{API_KEY_PREFIX}{api_key}"); + fn sign_jwt(&self, claims: &TokenClaims, key: &SigningKeyInfo) -> AppResult { + let secret_bytes = B64 + .decode(&key.key_material) + .map_err(|e| AppError::InternalServerError(format!("bad key material: {e}")))?; + let encoding_key = EncodingKey::from_secret(&secret_bytes); + + let mut header = Header::new(Algorithm::HS256); + header.kid = Some(key.kid.clone()); + + encode(&header, claims, &encoding_key) + .map_err(|e| AppError::InternalServerError(format!("JWT encode error: {e}"))) + } + + async fn create_refresh_token(&self, user_id: &str) -> AppResult { + let token = format!("rt_{}", Uuid::now_v7()); + let key = format!("{REFRESH_PREFIX}{token}"); let mut conn = self.redis.get_connection(); - redis::Cmd::new() - .arg("DEL") - .arg(&key) - .query_async::<()>(&mut conn) + // Refresh tokens live for 7 days + let _: () = conn + .set_ex(&key, user_id, 86400 * 7) .await .map_err(AppError::Redis)?; - Ok(()) + Ok(token) + } + + async fn is_revoked(&self, jti: &str) -> AppResult { + let mut conn = self.redis.get_connection(); + let exists: bool = conn + .exists(format!("{REVOKED_PREFIX}{jti}")) + .await + .map_err(AppError::Redis)?; + Ok(exists) } } + +// ── Helpers ────────────────────────────────────────────────────────────── + +fn decode_header(token: &str) -> Result { + jsonwebtoken::decode_header(token) +} diff --git a/service/mod.rs b/service/mod.rs index d936788..5626034 100644 --- a/service/mod.rs +++ b/service/mod.rs @@ -63,7 +63,7 @@ pub struct NotificationService { } pub use im::ImService; -pub use internal_auth::InternalAuthService; +pub use internal_auth::TokenService; #[derive(Clone)] pub struct AppService { @@ -75,13 +75,13 @@ pub struct AppService { pub pr: PrService, pub notify: NotificationService, pub im: ImService, - pub internal_auth: InternalAuthService, + pub internal_auth: TokenService, pub ctx: Arc, } impl AppService { #[allow(clippy::too_many_arguments)] - pub fn new( + pub async fn new( version: String, db: AppDatabase, redis: AppRedis, @@ -91,7 +91,9 @@ impl AppService { registry: Arc, nats: Arc, ) -> Self { - let internal_auth = InternalAuthService::new(redis.clone()); + let token_service = TokenService::new(redis.clone()) + .await + .expect("failed to initialize TokenService"); let ctx = Arc::new(ServiceContext { version, @@ -114,7 +116,7 @@ impl AppService { pr: PrService { ctx: ctx.clone() }, notify: NotificationService { ctx: ctx.clone() }, im: ImService { ctx: ctx.clone() }, - internal_auth, + internal_auth: token_service, ctx, } }