use tonic::{Request, Response, Status}; use uuid::Uuid; use crate::models::channels::ChannelStats; use crate::models::common::{ChannelKind, ChannelType, Visibility}; use crate::models::workspaces::Workspace; use crate::pb::im::channel_service_server::ChannelService; use crate::pb::im::{ CreateCategoryRequest, CreateCategoryResponse, CreateChannelRequest, CreateChannelResponse, DeleteCategoryRequest, DeleteCategoryResponse, DeleteChannelRequest, DeleteChannelResponse, GetChannelRequest, GetChannelResponse, GetChannelStatsRequest, GetChannelStatsResponse, ListCategoriesRequest, ListCategoriesResponse, ListChannelsRequest, ListChannelsResponse, UpdateCategoryRequest, UpdateCategoryResponse, UpdateChannelRequest, UpdateChannelResponse, }; use crate::service::im::categories::{CreateCategoryParams, UpdateCategoryParams}; use crate::service::im::channels::{ChannelListFilters, CreateChannelParams, UpdateChannelParams}; use crate::service::im::session::ImSession; use crate::service::AppService; pub struct ChannelGrpcService { service: AppService, } impl ChannelGrpcService { pub fn new(service: AppService) -> Self { Self { service } } fn system_session() -> ImSession { ImSession::new(Uuid::nil()) } fn parse_uuid(s: &str, field: &str) -> Result { Uuid::parse_str(s).map_err(|e| Status::invalid_argument(format!("{field}: {e}"))) } async fn resolve_workspace_name(&self, workspace_id: Uuid) -> Result { Workspace::find_by_id(self.service.ctx.db.reader(), workspace_id) .await .map_err(|e| Status::internal(e.to_string()))? .map(|w| w.name) .ok_or_else(|| Status::not_found("workspace not found")) } fn to_proto_timestamp(dt: chrono::DateTime) -> prost_types::Timestamp { prost_types::Timestamp { seconds: dt.timestamp(), nanos: dt.timestamp_subsec_nanos() as i32, } } fn model_channel_to_proto(c: crate::models::channels::Channel) -> crate::pb::im::Channel { let channel_type = match c.channel_type { ChannelType::Public => crate::pb::im::ChannelType::Public, ChannelType::Private => crate::pb::im::ChannelType::Private, ChannelType::Direct => crate::pb::im::ChannelType::Direct, ChannelType::Group => crate::pb::im::ChannelType::Group, ChannelType::Repo => crate::pb::im::ChannelType::Repo, ChannelType::System => crate::pb::im::ChannelType::System, ChannelType::Unknown => crate::pb::im::ChannelType::Unspecified, }; let channel_kind = match c.channel_kind { ChannelKind::Text => crate::pb::im::ChannelKind::Text, ChannelKind::Voice => crate::pb::im::ChannelKind::Voice, ChannelKind::Stage => crate::pb::im::ChannelKind::Stage, ChannelKind::Forum => crate::pb::im::ChannelKind::Forum, ChannelKind::Announcement => crate::pb::im::ChannelKind::Announcement, ChannelKind::Unknown => crate::pb::im::ChannelKind::Unspecified, }; let visibility = match c.visibility { Visibility::Public => crate::pb::im::Visibility::Public, Visibility::Private => crate::pb::im::Visibility::Private, Visibility::Internal => crate::pb::im::Visibility::Internal, Visibility::Workspace => crate::pb::im::Visibility::Workspace, Visibility::Protected => crate::pb::im::Visibility::Protected, Visibility::Hidden => crate::pb::im::Visibility::Hidden, Visibility::Secret => crate::pb::im::Visibility::Secret, Visibility::Unknown => crate::pb::im::Visibility::Unspecified, }; crate::pb::im::Channel { id: c.id.to_string(), workspace_id: c.workspace_id.to_string(), category_id: c.category_id.map(|id| id.to_string()), parent_channel_id: c.parent_channel_id.map(|id| id.to_string()), name: c.name, topic: c.topic, description: c.description, channel_type: channel_type.into(), channel_kind: channel_kind.into(), visibility: visibility.into(), position: c.position.unwrap_or(0), nsfw: c.nsfw, read_only: c.read_only, archived: c.archived, created_by: Some(c.created_by.to_string()), rate_limit_per_user: c.rate_limit_per_user, archived_at: c.archived_at.map(Self::to_proto_timestamp), last_message_id: c.last_message_id.map(|id| id.to_string()), last_message_at: c.last_message_at.map(Self::to_proto_timestamp), created_at: Some(Self::to_proto_timestamp(c.created_at)), updated_at: Some(Self::to_proto_timestamp(c.updated_at)), } } fn model_category_to_proto( c: crate::models::channels::ChannelCategory, ) -> crate::pb::im::ChannelCategory { crate::pb::im::ChannelCategory { id: c.id.to_string(), workspace_id: c.workspace_id.to_string(), name: c.name, position: c.position, collapsed: c.collapsed, created_at: Some(Self::to_proto_timestamp(c.created_at)), updated_at: Some(Self::to_proto_timestamp(c.updated_at)), } } fn model_stats_to_proto(s: ChannelStats) -> crate::pb::im::ChannelStats { crate::pb::im::ChannelStats { channel_id: s.channel_id.to_string(), members_count: s.members_count as i32, messages_count: s.messages_count as i32, threads_count: s.threads_count as i32, reactions_count: s.reactions_count as i32, mentions_count: s.mentions_count as i32, files_count: s.files_count as i32, last_activity_at: s.last_activity_at.map(Self::to_proto_timestamp), updated_at: Some(Self::to_proto_timestamp(s.updated_at)), } } async fn resolve_category_workspace(&self, category_id: Uuid) -> Result { let workspace_id: Uuid = sqlx::query_scalar( "SELECT workspace_id FROM channel_category WHERE id = $1", ) .bind(category_id) .fetch_optional(self.service.ctx.db.reader()) .await .map_err(|e| Status::internal(e.to_string()))? .ok_or_else(|| Status::not_found("category not found"))?; self.resolve_workspace_name(workspace_id).await } } #[tonic::async_trait] impl ChannelService for ChannelGrpcService { async fn get_channel( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let channel_id = Self::parse_uuid(&req.channel_id, "channel_id")?; let session = Self::system_session(); let channel = self .service .im .resolve_channel(channel_id) .await .map_err(|e| Status::internal(e.to_string()))?; let wk_name = self.resolve_workspace_name(channel.workspace_id).await?; let channel = self .service .im .channel_get(&session, &wk_name, channel_id) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(GetChannelResponse { channel: Some(Self::model_channel_to_proto(channel)), })) } async fn list_channels( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let session = Self::system_session(); let channel_type = req.channel_type() .as_str_name() .strip_prefix("CHANNEL_TYPE_") .map(|s| s.to_lowercase()) .filter(|s| s != "unspecified"); let channel_kind = req.channel_kind() .as_str_name() .strip_prefix("CHANNEL_KIND_") .map(|s| s.to_lowercase()) .filter(|s| s != "unspecified"); let filters = ChannelListFilters { channel_type, channel_kind, category_id: req.category_id.as_deref().and_then(|s| Uuid::parse_str(s).ok()), archived: None, }; let channels = self .service .im .channel_list( &session, &req.workspace_name, filters, req.limit as i64, req.offset as i64, ) .await .map_err(|e| Status::internal(e.to_string()))?; let total = channels.len() as i32; let proto_channels: Vec<_> = channels .into_iter() .map(Self::model_channel_to_proto) .collect(); Ok(Response::new(ListChannelsResponse { channels: proto_channels, total, })) } async fn create_channel( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let session = Self::system_session(); let params = CreateChannelParams { name: req.name, topic: req.topic, description: req.description, channel_type: req.channel_type, channel_kind: req.channel_kind, visibility: req.visibility, category_id: req.category_id.as_deref().and_then(|s| Uuid::parse_str(s).ok()), parent_channel_id: req.parent_channel_id.as_deref().and_then(|s| Uuid::parse_str(s).ok()), nsfw: None, rate_limit_per_user: req.rate_limit_per_user, }; let channel = self .service .im .channel_create(&session, &req.workspace_name, params, Uuid::nil()) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(CreateChannelResponse { channel: Some(Self::model_channel_to_proto(channel)), })) } async fn update_channel( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let channel_id = Self::parse_uuid(&req.channel_id, "channel_id")?; let session = Self::system_session(); let existing = self .service .im .resolve_channel(channel_id) .await .map_err(|e| Status::internal(e.to_string()))?; let wk_name = self.resolve_workspace_name(existing.workspace_id).await?; let params = UpdateChannelParams { name: req.name, topic: req.topic, description: req.description, visibility: req.visibility, category_id: req.category_id.as_deref().and_then(|s| Uuid::parse_str(s).ok()), position: req.position, nsfw: req.nsfw, rate_limit_per_user: req.rate_limit_per_user, archived: req.archived, read_only: req.read_only, }; let channel = self .service .im .channel_update(&session, &wk_name, channel_id, params, Uuid::nil()) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(UpdateChannelResponse { channel: Some(Self::model_channel_to_proto(channel)), })) } async fn delete_channel( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let channel_id = Self::parse_uuid(&req.channel_id, "channel_id")?; let session = Self::system_session(); let existing = self .service .im .resolve_channel(channel_id) .await .map_err(|e| Status::internal(e.to_string()))?; let wk_name = self.resolve_workspace_name(existing.workspace_id).await?; self.service .im .channel_delete(&session, &wk_name, channel_id, Uuid::nil()) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(DeleteChannelResponse {})) } async fn get_channel_stats( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let channel_id = Self::parse_uuid(&req.channel_id, "channel_id")?; let stats = sqlx::query_as::<_, ChannelStats>( "SELECT * FROM channel_stats WHERE channel_id = $1", ) .bind(channel_id) .fetch_optional(self.service.ctx.db.reader()) .await .map_err(|e| Status::internal(e.to_string()))?; match stats { Some(s) => Ok(Response::new(GetChannelStatsResponse { stats: Some(Self::model_stats_to_proto(s)), })), None => Err(Status::not_found("Channel stats not found")), } } async fn list_categories( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let session = Self::system_session(); let categories = self .service .im .category_list(&session, &req.workspace_name) .await .map_err(|e| Status::internal(e.to_string()))?; let proto_categories: Vec<_> = categories .into_iter() .map(Self::model_category_to_proto) .collect(); Ok(Response::new(ListCategoriesResponse { categories: proto_categories, })) } async fn create_category( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let session = Self::system_session(); let params = CreateCategoryParams { name: req.name, position: req.position, }; let category = self .service .im .category_create(&session, &req.workspace_name, params) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(CreateCategoryResponse { category: Some(Self::model_category_to_proto(category)), })) } async fn update_category( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let category_id = Self::parse_uuid(&req.category_id, "category_id")?; let session = Self::system_session(); let wk_name = self.resolve_category_workspace(category_id).await?; let params = UpdateCategoryParams { name: req.name, position: req.position, collapsed: req.collapsed, }; let category = self .service .im .category_update(&session, &wk_name, category_id, params) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(UpdateCategoryResponse { category: Some(Self::model_category_to_proto(category)), })) } async fn delete_category( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); let category_id = Self::parse_uuid(&req.category_id, "category_id")?; let session = Self::system_session(); let wk_name = self.resolve_category_workspace(category_id).await?; self.service .im .category_delete(&session, &wk_name, category_id) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(DeleteCategoryResponse {})) } }