diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..273b4d6 --- /dev/null +++ b/.env.example @@ -0,0 +1,71 @@ +# ============================================================================= +# imks — IM 实时消息服务 环境变量配置 +# 复制此文件为 .env 并修改相应值 +# ============================================================================= + +# --- 部署模式 --- +# Adapter 模式: "local" (单节点) | "redis" | "nats" +IMKS_ADAPTER=local + +# 当前节点唯一标识(默认取主机名) +# IMKS_SERVER_ID=imks-node-1 + +# Redis 连接(IMKS_ADAPTER=redis 时必需) +# IMKS_REDIS_URL=redis://localhost:6379 + +# NATS 连接(IMKS_ADAPTER=nats 时必需) +# IMKS_NATS_URL=nats://localhost:4222 + +# --- WebTransport (QUIC) --- +# 启用 WebTransport 服务(需要 TLS 证书) +# IMKS_WT_ENABLED=false +# IMKS_WT_PORT=3001 +# IMKS_WT_CERT_PATH=/path/to/cert.pem +# IMKS_WT_KEY_PATH=/path/to/key.pem + +# --- 数据库 --- +# PostgreSQL 连接字符串 +# DATABASE_URL=postgres://imks:password@localhost:5432/imks +DATABASE_URL=postgres://localhost/imks + +# 连接池配置 +# DATABASE_MAX_CONNECTIONS=10 +# DATABASE_MIN_CONNECTIONS=2 +# DATABASE_CONNECT_TIMEOUT=30 +# DATABASE_IDLE_TIMEOUT=600 + +# --- appks gRPC 连接 --- +# appks 核心服务地址 +# APPKS_GRPC_ADDR=http://localhost:50051 + +# 连接超时(秒) +# APPKS_GRPC_TIMEOUT=10 + +# mTLS 配置(生产环境必需) +# APPKS_GRPC_TLS_CA_CERT=/path/to/ca.pem +# APPKS_GRPC_TLS_CLIENT_CERT=/path/to/client.pem +# APPKS_GRPC_TLS_CLIENT_KEY=/path/to/client-key.pem +# APPKS_GRPC_TLS_DOMAIN=appks.internal + +# --- OpenTelemetry 可观测性 --- +# 服务名 +# OTEL_SERVICE_NAME=imks +# OTEL_SERVICE_VERSION=0.1.0 + +# OTLP 收集器地址 +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# 协议: grpc | http/protobuf +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# 启用/禁用 telemetry +# OTEL_TRACES_ENABLED=true +# OTEL_METRICS_ENABLED=true +# OTEL_LOGS_ENABLED=true + +# 日志级别: trace | debug | info | warn | error +RUST_LOG=info +# 日志格式: json | pretty +# LOG_FORMAT=json + +# 部署环境标识 +# OTEL_RESOURCE_ATTRIBUTES_DEPLOYMENT=development diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..91e94cd --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,804 @@ +# AGENTS.md — 开发规范 / Development Guidelines + +> 本文件为所有 AI 编码助手(Claude Code、pi、Cursor 等)提供统一的开发指导。 +> This file provides unified development guidelines for all AI coding assistants. + +**最后更新 / Last Updated**: 2026-06-11 + +--- + +## 目录 / Table of Contents + +1. [语言 / Language](#1-语言--language) +2. [代码风格 / Code Style](#2-代码风格--code-style) +3. [禁止模式 / Forbidden Patterns](#3-禁止模式--forbidden-patterns) +4. [错误处理 / Error Handling](#4-错误处理--error-handling) +5. [安全规范 / Security](#5-安全规范--security) +6. [数据库规范 / Database](#6-数据库规范--database) +7. [Socket.IO 事件规范 / Socket.IO Event Conventions](#7-socketio-事件规范--socketio-event-conventions) +8. [日志与可观测性 / Logging & Observability](#8-日志与可观测性--logging--observability) +9. [性能规范 / Performance](#9-性能规范--performance) +10. [测试规范 / Testing](#10-测试规范--testing) +11. [Git 规范 / Git Workflow](#11-git-规范--git-workflow) +12. [工作流程 / Workflow](#12-工作流程--workflow) +13. [架构决策记录 / ADR](#13-架构决策记录--adr) +14. [审查清单 / Review Checklist](#14-审查清单--review-checklist) + +--- + +## 1. 语言 / Language + +**Always respond in Chinese (中文).** Use the user's language for all conversations and explanations. Code, commands, and technical terms can remain in English. + +始终使用中文回复。代码、命令和技术术语可以保留英文。 + +--- + +## 2. 代码风格 / Code Style + +### 2.1 基本原则 / Basic Principles + +| 规则 | 说明 | +|---|---| +| 遵循现有风格 | Follow existing project conventions | +| 有意义命名 | Use meaningful variable names; avoid single-letter names except loop counters | +| 函数长度 | Keep functions under **50 lines**; split complex logic into smaller functions | +| 嵌套深度 | Maximum nesting depth: **3 levels**; use early returns to flatten logic | +| 注释 | Add comments for complex logic only; prefer self-documenting code | +| 文档注释 | Public items must have `///` doc comments; private items only when logic is non-obvious | + +### 2.2 Rust 最佳实践 / Rust Best Practices + +```rust +// ✅ 正确 / Correct +fn get_message(id: Uuid) -> Result { + let msg = db.find_message(id).await?; // 使用 ? 传播错误 + Ok(msg) +} + +// ❌ 错误 / Incorrect +fn get_message(id: Uuid) -> Message { + db.find_message(id).await.unwrap() // 禁止 unwrap() +} +``` + +| 规则 | 说明 | +|---|---| +| 错误传播 | Use `?` operator for error propagation; never use `unwrap()` or `expect()` in non-test code | +| `unsafe` | Avoid `unsafe` blocks; if necessary, add a `// SAFETY:` comment explaining why | +| `clone()` | Minimize `clone()` usage; prefer references or `Arc` for shared ownership | +| 魔法数字 | No magic numbers; define named constants with `const` | +| 硬编码字符串 | No hardcoded strings for config/status; use enums or constants | +| 死代码 | Remove dead code; don't leave commented-out code blocks | +| 未完成代码 | Don't commit `unimplemented!()`, `todo!()`, or `FIXME` without a tracking issue | + +### 2.3 导入规范 / Import Guidelines + +```rust +// 标准库 → 第三方 crate → 本地模块 +// stdlib → third-party crates → local modules +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::models::message::Message; +use crate::socket::packet::Packet; +``` + +### 2.4 模型设计规范 / Model Design Guidelines + +imks 的 models 层采用 **一文件一实体** 的拆分策略: + +``` +models/ +├── mod.rs # 模块声明 + 公开 re-export +├── message.rs # 核心 Message + MessageDetail + AuthorInfo + MessageType +├── message_attachment.rs # 附件 +├── message_bookmark.rs # 书签/收藏 +├── message_draft.rs # 草稿 +├── message_edit.rs # 编辑历史 +├── message_embed.rs # 富媒体嵌入 + EmbedField +├── message_mention.rs # @提及 +├── message_pin.rs # 置顶消息 +├── message_poll.rs # 投票 + Option + Vote +├── message_reaction.rs # 表情反应 +├── message_read_state.rs # 已读状态 +└── message_thread.rs # 消息线程 +``` + +每个模型文件包含: +- Row struct(`sqlx::FromRow`) +- Summary/detail struct(API 响应用,带 `From` 转换) +- 查询 SQL 常量(`$1, $2...` 占位符) +- `CREATE_TABLE_SQL` 迁移 DDL + 索引 +- `#[cfg(test)]` 序列化/转换测试 + +--- + +## 3. 禁止模式 / Forbidden Patterns + +以下代码模式在项目中严格禁止: + +| 禁止项 | 说明 | +|-------------------------------|------------------------------------------------| +| `// ── xxxx ──────────` | 禁止使用此类分隔线注释 | +| `unwrap()` / `expect()` (非测试) | 在非测试代码中禁止使用;使用 `?` 或 `unwrap_or` 等安全替代 | +| `panic!()` / `unreachable!()` | 除极少数不可能到达的分支外禁止使用 | +| 未处理的 `todo!()` | 不得提交包含 `todo!()` 的代码,除非有对应的 issue 追踪 | +| 注释掉的代码 | 不得提交被注释的代码块;使用 Git 历史追溯 | +| 过深嵌套 (≥4层) | 使用 early return、`match`、`map`/`and_then` 扁平化逻辑 | +| 过长函数 (>50行) | 拆分为更小的、职责单一的函数 | +| 魔法数字 | 使用 `const` 定义命名常量 | +| 硬编码字符串 | 使用枚举或常量定义配置值/状态值 | +| 死代码 | 删除未使用的代码、导入和变量 | +| `Box` 在公共 API | 使用具体错误类型替代 trait object | + +--- + +## 4. 错误处理 / Error Handling + +### 4.1 错误类型体系 / Error Type System + +imks 使用统一的 `ImksError` 枚举和 `ImksResult` 类型别名,与 appks 的 `AppError`/`AppResult` 保持一致风格。 + +```rust +// error.rs — 统一错误类型 +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ImksError { + // Protocol layer (engine) + #[error("invalid engine packet type: {0}")] + InvalidEnginePacketType(u8), + #[error("invalid engine packet type char: {0}")] + InvalidEnginePacketTypeChar(char), + #[error("empty engine packet")] + EmptyEnginePacket, + #[error("invalid base64: {0}")] + InvalidBase64(#[from] base64::DecodeError), + #[error("invalid utf8 in packet: {0}")] + InvalidPacketUtf8(#[from] FromUtf8Error), + #[error("engine serialization error: {0}")] + EngineSerialization(String), + + // Transport upgrade + #[error("session not found for upgrade")] + UpgradeSessionNotFound, + #[error("session already closed, cannot upgrade")] + UpgradeSessionClosed, + #[error("invalid session state for upgrade")] + UpgradeInvalidState, + + // Socket.IO layer + #[error("invalid socket packet type: {0}")] + InvalidSocketPacketType(u8), + #[error("invalid socket packet type char: {0}")] + InvalidSocketPacketTypeChar(char), + #[error("empty socket packet")] + EmptySocketPacket, + #[error("invalid socket packet format: {0}")] + InvalidSocketPacketFormat(String), + #[error("missing namespace in socket packet")] + MissingNamespace, + #[error("invalid attachment count in binary event")] + InvalidAttachmentCount, + + // Socket namespace + #[error("namespace error: {0}")] + Namespace(String), + #[error("socket not found: {0}")] + SocketNotFound(String), + #[error("failed to send packet to socket: channel full")] + SocketSendFull, + + // Adapter layer + #[error("adapter redis error: {0}")] + AdapterRedis(String), + #[error("adapter nats error: {0}")] + AdapterNats(String), + #[error("adapter message bus error: {0}")] + AdapterMessageBus(String), + #[error("adapter serialization error: {0}")] + AdapterSerialization(String), + #[error("adapter room error: {0}")] + AdapterRoom(String), + + // Database + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + + // gRPC + #[error("gRPC error: {0}")] + GrpcStatus(#[from] tonic::Status), + #[error("gRPC transport error: {0}")] + GrpcTransport(#[from] tonic::transport::Error), + + // Serialization + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + // Auth + #[error("auth error: {0}")] + Auth(String), + #[error("token expired")] + TokenExpired, + + // General + #[error("not found: {0}")] + NotFound(String), + #[error("invalid input: {0}")] + InvalidInput(String), + #[error("internal error: {0}")] + Internal(String), +} + +pub type ImksResult = Result; +``` + +### 4.2 错误处理原则 / Error Handling Principles + +| 原则 | 说明 | +|-----------|-------------------------------------------------------------------| +| 统一类型 | 所有公共 API 返回 `ImksResult`,不暴露子模块错误 | +| 显式处理 | Handle all errors explicitly; no silent failures | +| `From` 转换 | 外部库错误通过 `#[from]` 自动转换,减少 `map_err` 样板 | +| 异步传播 | Use `?` operator; don't suppress errors in spawned tasks | +| 通道满 | Handle `mpsc::TrySendError` gracefully(buffer or log),don't panic | +| gRPC 错误 | Map `tonic::Status` → `ImksError::Grpc` | + +### 4.3 错误日志格式 / Error Logging Format + +```rust +// 记录错误时包含完整上下文 +tracing::error!( + error = %err, + socket_sid = %sid, + event = %event_name, + "Failed to handle socket event" +); +``` + +### 4.4 现有子模块迁移 / Migration from Submodule Errors + +当前部分子模块使用独立的 `thiserror` enum(`PacketError`、`AdapterError`),需要逐步迁移到 `ImksError`: + +```rust +// 旧 / Old +pub enum PacketError { ... } +pub fn decode(data: &str) -> Result { ... } + +// 新 / New +pub fn decode(data: &str) -> ImksResult { + // 内部仍可用 thiserror,对外转为 ImksError + inner_decode(data).map_err(|e| ImksError::Packet(e.to_string())) +} +``` + +--- + +## 5. 安全规范 / Security + +> 用户认证、授权、密码管理、2FA 等企业级安全由 appks 统一处理。 +> imks 作为内部消息服务,仅负责消息层面的安全。 + +### 5.1 基础安全 / Basic Security + +| 规则 | 说明 | +|---------|-----------------------------------------------------------------| +| 密钥管理 | Never hardcode secrets or API keys; use environment variables | +| 输入验证 | Always validate and sanitize user input(消息体、事件名、namespace path) | +| SQL 注入 | Use parameterized queries(sqlx handles this automatically) | +| gRPC 安全 | appks ↔ imks 的 gRPC 连接应使用 mTLS,防止密钥在传输中被截获 | + +### 5.2 JWT 验证双模式 / Dual-Mode JWT Verification + +imks 支持两种 JWT 验证模式(详见 `rpc.md`): + +| 模式 | 方式 | 延迟 | 适用场景 | +|------------|---------------------------------------|-----|------------| +| **RPC 验证** | 调用 appks `TokenService.VerifyToken()` | 实时 | 敏感操作 | +| **本地验证** | 启动时拉取 `GetSigningKeys()` 缓存到本地 | 零延迟 | 高频操作(消息收发) | + +推荐策略:普通操作(发消息、读频道)→ 本地验证,敏感操作 → RPC 验证。 + +### 5.3 连接安全 / Connection Security + +| 要求 | 说明 | +|--------|------------------------------------------------| +| 命名空间验证 | 验证 namespace path(`/` 开头,≤256 字符,无控制字符),防止 DoS | +| 消息体大小 | 限制单条消息大小(`EngineConfig.max_payload`) | +| 速率限制 | 按 socket 限制消息发送频率 | + +--- + +## 6. 数据库规范 / Database + +### 6.1 基础规范 / Basic Rules + +| 规则 | 说明 | +|---------|------------------------------------------------------------------| +| 参数化查询 | Always use parameterized queries (sqlx does this by default) | +| 迁移规范 | All schema changes must go through migration files in `migrate/` | +| UUID v7 | 所有主键使用 UUID v7(时间有序),便于索引和游标分页 | +| 软删除 | 使用 `deleted_at` 字段进行软删除,不用硬删除 | +| 去规范化 | 对于高频读取的聚合字段(如 `total_votes`、`unread_count`),可在表中冗余存储 | + +### 6.2 imks 管理的数据库表 / imks-Managed Tables + +imks 仅管理消息相关表,用户/频道/成员/权限等由 appks 核心服务管理。 + +| 表 | 对应模型文件 | 说明 | +|-----------------------|--------------------------------|-------------------| +| `message` | `models/message.rs` | 核心消息表(由 appks 创建) | +| `message_attachment` | `models/message_attachment.rs` | 文件附件 | +| `message_embed` | `models/message_embed.rs` | 富媒体嵌入 | +| `message_embed_field` | `models/message_embed.rs` | 嵌入字段 | +| `message_poll` | `models/message_poll.rs` | 投票 | +| `message_poll_option` | `models/message_poll.rs` | 投票选项 | +| `message_poll_vote` | `models/message_poll.rs` | 投票记录 | +| `message_pin` | `models/message_pin.rs` | 置顶消息 | +| `message_read_state` | `models/message_read_state.rs` | 已读状态 | +| `message_draft` | `models/message_draft.rs` | 草稿 | +| `message_edit` | `models/message_edit.rs` | 编辑历史 | + +### 6.3 性能优化 / Performance Optimization + +| 规则 | 说明 | +|-------------|---------------------------------------------------------------------------------| +| N+1 防护 | Use `JOIN` or batch queries instead of N+1 patterns | +| 游标分页 | Use UUID v7 cursor-based pagination(`WHERE id < $3 ORDER BY id DESC`),不用 OFFSET | +| 索引规范 | Add indexes for frequently queried columns; document index rationale | +| ON CONFLICT | Use `INSERT ... ON CONFLICT` for upsert patterns(draft、read_state) | + +--- + +## 7. Socket.IO 事件规范 / Socket.IO Event Conventions + +### 7.1 事件命名 / Event Naming + +imks 使用 Socket.IO 协议进行实时通信,事件名遵循以下约定: + +``` +// 客户端 → 服务端(发送操作) +"message:send" // 发送消息 +"message:edit" // 编辑消息 +"message:delete" // 删除消息 +"typing:start" // 开始输入 +"typing:stop" // 停止输入 +"reaction:add" // 添加反应 +"reaction:remove" // 移除反应 + +// 服务端 → 客户端(推送事件) +"message:new" // 新消息 +"message:updated" // 消息已更新 +"message:deleted" // 消息已删除 +"typing" // 用户输入状态 +"reaction:updated" // 反应已更新 +"presence:update" // 在线状态变更 +``` + +### 7.2 事件数据结构 / Event Data Structure + +```json +// 客户端发送消息事件 +// Client sends: "message:send" +{ + "channel_id": "01909a...", + "body": "hello world", + "thread_id": null, + "reply_to_message_id": null +} + +// 服务端广播新消息 +// Server broadcasts: "message:new" +{ + "id": "01909b...", + "channel_id": "01909a...", + "author": { "id": "...", "username": "alice" }, + "body": "hello world", + "created_at": "2026-06-11T10:00:00Z", + "reactions": {}, + "attachment_count": 0 +} +``` + +### 7.3 房间(Room)机制 / Room Mechanism + +```rust +// channel_id 作为房间名,频道内消息只广播给加入该房间的 sockets +// Use channel_id as room name; messages broadcast only to sockets in that room +namespace.emit_to_room(&channel_id, "message:new", message_data).await; +``` + +### 7.4 适配器(Adapter)模式 / Adapter Pattern + +imks 通过 Adapter trait 支持水平扩展,将 Socket.IO 事件广播到多节点: + +``` +┌─────────┐ ┌─────────┐ ┌─────────┐ +│ Node 1 │────→│ NATS / │←────│ Node 2 │ +│ (imks) │ │ Redis │ │ (imks) │ +└─────────┘ └─────────┘ └─────────┘ +``` + +| Adapter | 适用场景 | +|---|---| +| `LocalAdapter` | 单节点开发/测试 | +| `RedisAdapter` | 生产环境 Redis Pub/Sub | +| `NatsAdapter` | 生产环境 NATS(更低延迟) | + +--- + +## 8. 日志与可观测性 / Logging & Observability + +### 8.1 日志规范 / Logging Standards + +```rust +// 使用 tracing crate 进行结构化日志 +use tracing::{info, warn, error, debug}; + +info!( + socket_sid = %socket.sid, + engine_sid = %socket.engine_sid, + namespace = %namespace.path, + "Socket connected" +); + +warn!( + socket_sid = %sid, + "Adapter register error: {}", + e +); + +error!( + error = %err, + engine_sid = %sid, + "Failed to handle engine message" +); +``` + +| 级别 | 用途 | +|---------|-------------------------------------| +| `error` | 错误需要立即关注(连接失败、数据库错误) | +| `warn` | 异常但可恢复的情况(adapter 注册失败、socket 发送失败) | +| `info` | 关键业务操作记录(连接/断开、命名空间创建) | +| `debug` | 开发调试信息(数据包收发详情) | +| `trace` | 详细执行路径 | + +### 8.2 关键指标 / Key Metrics + +| 指标 | 说明 | +|------------|-----------------------------------------------------| +| 活跃连接数 | Active WebSocket + Polling + WebTransport sessions | +| 消息吞吐量 | Messages sent/received per second | +| 广播延迟 | P50/P95/P99 broadcast latency across nodes | +| 事件处理延迟 | Event handling time (receive → broadcast → deliver) | +| Adapter 延迟 | NATS/Redis broadcast delay between nodes | +| 连接错误率 | Connection failure rate | +| 数据库查询延迟 | Message insert/select latency | + +### 8.3 健康检查 / Health Check + +```json +// GET /health +{ + "status": "healthy", + "version": "0.1.0", + "uptime": 3600, + "checks": { + "postgres": { "status": "up", "latency_ms": 5 }, + "redis": { "status": "up", "latency_ms": 2 }, + "nats": { "status": "up", "latency_ms": 1 }, + "appks_grpc": { "status": "up", "latency_ms": 3 } + } +} +``` + +--- + +## 9. 性能规范 / Performance + +### 9.1 实时消息 SLA / Real-time Messaging SLA + +| 指标 | 目标 | +|--------------|---------------| +| 消息端到端延迟(P50) | <50ms | +| 消息端到端延迟(P95) | <200ms | +| 消息端到端延迟(P99) | <500ms | +| 连接建立时间 | <100ms | +| 消息吞吐量(单节点) | >10,000 msg/s | +| 错误率 | <0.1% | + +### 9.2 性能原则 / Performance Principles + +| 原则 | 说明 | +|------|---------------------------------------------------------------------| +| 零拷贝 | Minimize data copying; use references where possible | +| 批量操作 | Batch adapter broadcasts; use pipelining for Redis | +| 无锁优先 | Use `DashMap` (lock-free) for hot-path data; `RwLock` for cold-path | +| 背压控制 | Use bounded `mpsc::channel` (256) to prevent memory blowout | +| 连接复用 | Reuse gRPC channels to appks | + +### 9.3 优化策略 / Optimization Strategies + +| 场景 | 策略 | +|---------|----------------------------------| +| 跨节点广播 | NATS(<1ms P50)优于 Redis(~2ms P50) | +| 消息持久化 | 异步写入 + 批量 COMMIT | +| 会话查找 | `DashMap` 直接查找,无锁竞争 | +| gRPC 调用 | 连接池 + 本地密钥缓存减少 RPC 往返 | + +--- + +## 10. 测试规范 / Testing + +### 10.1 基础要求 / Basic Requirements + +| 规则 | 说明 | +|--------|-------------------------------------------------------------| +| 新功能 | All new features must have unit tests | +| Bug 修复 | Bug fixes must include regression tests | +| 模型测试 | All model files must have serialization/conversion tests | +| 测试隔离 | Tests must be independent and not depend on execution order | + +### 10.2 测试命令 / Test Commands + +```bash +cargo test # 运行所有测试 +cargo test --lib # 仅运行 lib 测试 +cargo test models:: # 运行 models 模块测试 +cargo test socket::parser:: # 运行 socket parser 测试 +cargo test -- --nocapture # 显示输出 +``` + +### 10.3 测试文件组织 / Test File Organization + +``` +tests/ +├── engine_io_tests.rs # Engine.IO 协议测试 +├── socket_io_tests.rs # Socket.IO 协议测试 +├── adapter_tests.rs # Adapter 测试 +└── session_tests.rs # 会话管理测试 + +# 单元测试 (inline) +models/message.rs → #[cfg(test)] mod tests { ... } +models/message_poll.rs → #[cfg(test)] mod tests { ... } +socket/parser.rs → #[cfg(test)] mod tests { ... } +engine/codec.rs → #[cfg(test)] mod tests { ... } +``` + +--- + +## 11. Git 规范 / Git Workflow + +### 11.1 提交信息格式 / Commit Message Format + +使用 Angular 风格,全部英文: + +``` +(): + +[optional body] + +[optional footer] +``` + +| Type | 说明 | +|------------|--------| +| `feat` | 新功能 | +| `fix` | Bug 修复 | +| `refactor` | 重构 | +| `docs` | 文档 | +| `test` | 测试 | +| `chore` | 构建/工具 | +| `perf` | 性能优化 | +| `style` | 代码格式 | + +**示例 / Examples:** +``` +feat(models): add MessagePin, MessageReadState, MessageDraft, MessageEdit models +fix(socket): handle namespace validation on connect +refactor(engine): extract session store to dedicated module +docs(readme): add architecture overview +test(parser): add edge cases for binary event decoding +chore(deps): update tonic to 0.14 +``` + +### 11.2 提交原则 / Commit Principles + +| 原则 | 说明 | +|--------|----------------------------------------------------------| +| 原子提交 | Each commit should address one concern | +| 完整性 | Each commit should leave the codebase in a working state | +| 禁止强制推送 | Never force push to main branch | +| 提交前检查 | Run `cargo check` and `cargo test` before committing | + +### 11.3 分支策略 / Branch Strategy + +| 分支 | 用途 | +|-------------|--------| +| `main` | 生产就绪代码 | +| `feat/*` | 功能开发 | +| `fix/*` | Bug 修复 | +| `release/*` | 发布准备 | + +--- + +## 12. 工作流程 / Workflow + +### 12.1 开发流程 / Development Process + +1. **理解先于编写** — Read before write; understand context first +2. **最小变更** — Minimal changes; don't refactor unrelated code +3. **验证变更** — Verify after changes; run tests or check output +4. **文档同步** — Update documentation when changing public APIs + +### 12.2 AI 助手工作规范 / AI Assistant Guidelines + +| 规则 | 说明 | +|--------|-----------------------------------------------------| +| 先读后写 | Always read existing code before making changes | +| 最小侵入 | Make minimal changes; don't refactor unrelated code | +| 验证结果 | Run `cargo check` or `cargo test` after changes | +| 解释变更 | Explain what you changed and why | +| 询问不确定 | Ask when unsure about requirements | +| 遵守禁止模式 | Never use `// ── xxxx ──────────` style comments | + +### 12.3 常用命令 / Common Commands + +```bash +cargo build # 构建 +cargo check # 快速检查(推荐开发时使用) +cargo test # 运行测试 +cargo test --lib # 仅运行 lib 测试 +cargo clippy # Lint 检查 +cargo fmt # 格式化 +cargo doc --no-deps # 生成文档 +``` + +--- + +## 13. 架构决策记录 / ADR + +架构决策记录存放在 `docs/adr/` 目录下,使用 Markdown 格式。 + +### 当前决策 / Current Decisions + +| ADR | 标题 | 状态 | +|---|---|---| +| — | Socket.IO 作为实时通信协议 | Accepted | +| — | UUID v7 作为主键实现游标分页 | Accepted | +| — | 双模式 JWT 验证(本地 + RPC) | Accepted | +| — | Adapter 模式支持多节点水平扩展 | Accepted | +| — | 消息表由 appks 管理,imks 仅扩展富内容表 | Accepted | + +### ADR 模板 / ADR Template + +```markdown +# ADR-NNN: 标题 + +## 状态 +Accepted | Superseded | Deprecated + +## 背景 +描述问题背景 + +## 决策 +描述做出的决策 + +## 后果 +描述正面和负面影响 +``` + +--- + +## 14. 审查清单 / Review Checklist + +### 代码审查 / Code Review + +- [ ] 代码风格符合项目规范(无 `// ──` 分隔线) +- [ ] 没有使用禁止模式(unwrap、panic、todo 等) +- [ ] 错误处理完整(?传播、具体类型) +- [ ] 安全考虑已处理(JWT 验证模式选择正确) +- [ ] 性能影响已评估(无 N+1、无阻塞调用) +- [ ] 测试已添加(model 文件必须有测试) +- [ ] 文档已更新(新 struct 有 doc comment) + +### PR 审查 / PR Review + +- [ ] 提交信息符合 Angular 风格 +- [ ] 每个提交只关注一个问题 +- [ ] 变更范围合理 +- [ ] 没有遗留的 TODO/FIXME +- [ ] `cargo check` 和 `cargo test` 通过 + +### 发布前审查 / Pre-release Review + +- [ ] 所有测试通过 +- [ ] 无 clippy warning +- [ ] 迁移 SQL 已包含在 `migrate/` 中 +- [ ] 依赖安全审计通过(`cargo audit`) + +--- + +## 附录 / Appendix + +### 项目架构速查 / Quick Architecture Reference + +``` +imks — IM 实时消息服务 / Real-time Messaging Service + +┌──────────────────────────────────────────────┐ +│ imks │ +│ │ +│ ┌─────────────┐ ┌─────────────────────────┐ │ +│ │ engine/ │ │ socket/ │ │ +│ │ │ │ │ │ +│ │ • websocket │ │ • server (Socket.IO) │ │ +│ │ • webtransport│ │ • namespace (rooms) │ │ +│ │ • polling │ │ • parser (protocol) │ │ +│ │ • session │ │ • adapter (scale-out) │ │ +│ │ • packet │ │ ├─ local │ │ +│ │ • codec │ │ ├─ redis │ │ +│ │ • heartbeat │ │ └─ nats │ │ +│ │ • server │ │ • message_bus │ │ +│ └─────────────┘ │ • session_store │ │ +│ └─────────────────────────┘ │ +│ ┌─────────────┐ ┌─────────────────────────┐ │ +│ │ models/ │ │ pb/ │ │ +│ │ │ │ │ │ +│ │ 12 个消息 │ │ gRPC client stubs │ │ +│ │ 领域模型 │ │ → appks TokenService │ │ +│ │ │ │ → appks ChannelService │ │ +│ │ message │ │ → appks MemberService │ │ +│ │ attachment │ │ → appks PermissionService│ │ +│ │ embed/poll │ │ │ │ +│ │ reaction │ │ │ │ +│ │ thread/pin │ │ │ │ +│ │ draft/edit │ │ │ │ +│ │ mention │ │ │ │ +│ │ bookmark │ │ │ │ +│ │ read_state │ │ │ │ +│ └─────────────┘ └─────────────────────────┘ │ +│ │ +│ ┌─────────────┐ │ +│ │ migrate/ │ │ +│ │ │ │ +│ │ SQL 迁移 │ │ +│ └─────────────┘ │ +└─────────────────────┬────────────────────────┘ + │ gRPC + ▼ +┌──────────────────────────────────────────────┐ +│ appks (core) │ +│ │ +│ TokenService │ ChannelService │ MemberSvc │ +│ PermissionSvc │ WebhookSvc │ EmojiSvc │ +│ │ +│ Postgres (users, channels, members, ...) │ +│ Redis (JWT keys, sessions, rate limiting) │ +└──────────────────────────────────────────────┘ +``` + +### 基础设施速查 / Infrastructure Quick Reference + +| 服务 | 用途 | 协议/库 | +|---|---|---| +| Postgres | 消息数据持久化 | sqlx | +| Redis | Adapter 广播 / 会话存储 | fred | +| NATS | Adapter 广播(低延迟替代) | async-nats | +| appks gRPC | JWT 验证 / 频道/成员/权限查询 | tonic | + +### 传输层对比 / Transport Comparison + +| 传输 | 适用场景 | 特点 | +|------------------|-------------------|--------------------| +| **Polling** | 浏览器不支持 WS 时的降级 | 兼容性最好,延迟高 | +| **WebSocket** | 主流浏览器/移动端 | 全双工,低延迟 | +| **WebTransport** | 现代浏览器(Chrome 97+) | 基于 QUIC,多路复用,不队头阻塞 | + +--- + +*This document is maintained by the development team. For questions or suggestions, please open an issue.* diff --git a/Cargo.lock b/Cargo.lock index ca073bc..90b673d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1449,13 +1449,16 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ + "base64", "bytes", "futures-channel", "futures-util", "http 1.4.2", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.4", "tokio", @@ -1612,6 +1615,12 @@ dependencies = [ "fred", "futures-util", "jsonwebtoken", + "opentelemetry", + "opentelemetry-appender-tracing", + "opentelemetry-otlp", + "opentelemetry-prometheus", + "opentelemetry_sdk", + "prometheus", "prost", "prost-types", "rand 0.9.4", @@ -1626,6 +1635,7 @@ dependencies = [ "tonic-prost", "tonic-prost-build", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "uuid", "walkdir", @@ -1650,6 +1660,12 @@ dependencies = [ "serde_core", ] +[[package]] +name = "ipnet" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" + [[package]] name = "itertools" version = "0.14.0" @@ -1966,6 +1982,108 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0142c63252a9e054e68a4c61a5778f7b14f576274d593f8ce883d191a099682" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.18", + "tracing", +] + +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0080f0dc1d7c786f467cd85a4e395fcab11ee852004f39a29a18ab7c25d837" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + +[[package]] +name = "opentelemetry-http" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5683015d09e2df236ef005b17f6f196f0d5f6313c4fa43a7b6a53b52776e4331" +dependencies = [ + "async-trait", + "bytes", + "http 1.4.2", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9966929966d17620d7c316c643ba62631826e10021409357772d5eea84f62c35" +dependencies = [ + "http 1.4.2", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 2.0.18", + "tokio", + "tonic", + "tonic-types", +] + +[[package]] +name = "opentelemetry-prometheus" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0359983e7f79cf33c9abd89e5d7ddf67c46c419d0148598022d70e70c01aba" +dependencies = [ + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "prometheus", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56d658ba1faf63f7b9c492cfbe6e0ec365440a16132d3270c1065f7b33f1b638" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", + "tonic-prost", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b59f80e1ac4d5ff7a2db8fb6c80badb7f0f3f858211fba08dd9aaec750894f9" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "portable-atomic", + "rand 0.9.4", + "thiserror 2.0.18", + "tokio", + "tokio-stream", +] + [[package]] name = "parking" version = "2.2.1" @@ -2122,6 +2240,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 2.0.18", +] + [[package]] name = "prost" version = "0.14.4" @@ -2175,6 +2308,26 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "pulldown-cmark" version = "0.13.4" @@ -2418,6 +2571,37 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4" +[[package]] +name = "reqwest" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" +dependencies = [ + "base64", + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http 1.4.2", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "ring" version = "0.17.14" @@ -3073,6 +3257,9 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] [[package]] name = "synstructure" @@ -3370,6 +3557,17 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "tonic-types" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73ab1b02061f83d519bba3caa167f88f261ef05720ab8ebc954ade70de3348e8" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "tower" version = "0.5.3" @@ -3389,6 +3587,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cfcf7e2740e6fc6d4d688b4ef00650406bb94adf4731e43c096c3a19fe40840" +dependencies = [ + "bitflags", + "bytes", + "futures-util", + "http 1.4.2", + "http-body", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", + "url", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -3445,6 +3661,32 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adbc64cba7137545b8044cb1fe9814f7aacf3c6b5f9b45be8bb5db538befdb26" +dependencies = [ + "js-sys", + "opentelemetry", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.23" @@ -3455,12 +3697,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] @@ -3646,6 +3891,16 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54568702fabf5d4849ce2b90fadfa64168a097eaf4b351ce9df8b687a0086aaf" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.123" @@ -3712,6 +3967,16 @@ dependencies = [ "semver", ] +[[package]] +name = "web-sys" +version = "0.3.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e0871acf327f283dc6da28a1696cdc64fb355ba9f935d052021fa77f35cce69" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web-time" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 99761a2..d911d7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,14 @@ dashmap = "6" thiserror = "2" async-trait = "0.1" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt", "registry"] } +opentelemetry = { version = "0.32", features = ["trace", "metrics", "logs"] } +opentelemetry_sdk = { version = "0.32", features = ["trace", "metrics", "logs", "rt-tokio"] } +opentelemetry-otlp = { version = "0.32", features = ["trace", "metrics", "logs", "grpc-tonic", "http-proto", "tls-ring"] } +tracing-opentelemetry = "0.33" +opentelemetry-appender-tracing = "0.32" +opentelemetry-prometheus = "0.32" +prometheus = "0.14" fred = { version = "10", features = ["subscriber-client"] } async-nats = "0.38" futures-util = "0.3" diff --git a/README.md b/README.md new file mode 100644 index 0000000..2cd6872 --- /dev/null +++ b/README.md @@ -0,0 +1,126 @@ +# imks — IM 实时消息服务 + +基于 **Engine.IO + Socket.IO** 协议的即时通讯(IM)实时消息服务,支持 WebSocket、WebTransport、HTTP Long-Polling 多种传输层,通过 gRPC 与 [appks](https://github.com/your-org/appks) 核心服务集成,提供认证、权限、消息持久化和跨节点广播。 + +## 架构 + +``` +Client (Browser/App) + │ Socket.IO over WebSocket / WebTransport / Polling + ▼ +┌─────────────────────────────────────┐ +│ imks │ +│ │ +│ engine/ socket/ │ +│ • WS/WT/ • Socket.IO Server │ +│ Polling • Namespace/Room │ +│ • Session • Adapter (Redis/NATS)│ +│ • Heartbeat • Message Bus │ +│ │ +│ models/ repo/ svc/ │ +│ • 20+ 消息 • SQL CRUD • 业务 │ +│ 领域模型 • 分页查询 逻辑层 │ +│ │ +│ auth/ rpc/ │ +│ • JWT 双模 • gRPC Stubs │ +│ 验证 • Token/Channel/ │ +│ • 密钥缓存 Member/Permission │ +└──────────────┬──────────────────────┘ + │ gRPC (mTLS) + ▼ +┌─────────────────────────────────────┐ +│ appks (core) │ +│ Token │ Channel │ Member │ ... │ +│ Postgres • Redis • NATS │ +└─────────────────────────────────────┘ +``` + +## 快速开始 + +### 前置依赖 + +- **Rust** 1.85+ (edition 2024) +- **PostgreSQL** 16+ (消息持久化) +- **appks** gRPC 服务 (认证 & 权限) +- **Redis** (可选, 多节点广播) +- **NATS** (可选, 低延迟多节点广播) + +### 安装 & 运行 + +```bash +# 克隆仓库 +git clone https://github.com/your-org/imks.git +cd imks + +# 配置环境变量 +cp .env.example .env +# 编辑 .env,至少设置 DATABASE_URL 和 APPKS_GRPC_ADDR + +# 数据库迁移(自动执行) +# 首次启动会自动运行 migrate/ 下的 SQL 迁移 + +# 编译 +cargo build --release + +# 运行 +cargo run --release +# 默认监听 http://0.0.0.0:3000 +``` + +### 端点 + +| 端点 | 说明 | +|---|---| +| `GET /engine.io/` | Engine.IO 握手 & WebSocket 升级 | +| `POST /engine.io/` | Engine.IO HTTP Long-Polling | +| `GET /health` | 健康检查(含连接数、会话数、依赖检查) | +| `GET /metrics` | Prometheus 格式指标 | + +### 健康检查示例 + +```json +{ + "status": "healthy", + "version": "0.1.0", + "timestamp": "2026-06-11T10:00:00Z", + "uptime_secs": 3600, + "connections_active": 42, + "sessions_count": 42, + "checks": { + "postgres": { "status": "up", "latency_ms": 3 }, + "redis": { "status": "up", "latency_ms": 1 } + } +} +``` + +## 环境变量 + +完整列表见 [`.env.example`](./.env.example)。 + +### 核心配置 + +| 变量 | 默认值 | 说明 | +|---|---|---| +| `IMKS_ADAPTER` | `local` | `local` \| `redis` \| `nats` | +| `DATABASE_URL` | `postgres://localhost/imks` | PostgreSQL 连接串 | +| `APPKS_GRPC_ADDR` | `http://localhost:50051` | appks gRPC 地址 | + +## 开发 + +```bash +cargo check # 快速检查语法 +cargo test # 运行所有测试(111 个) +cargo test --lib # 仅库测试(91 个) +cargo clippy # Lint 检查 +cargo fmt # 格式化 +``` + +## 文档 + +- [AGENTS.md](./AGENTS.md) — 开发规范 +- [rpc.md](docs/rpc.md) — 认证方案 & Proto 契约 +- [migrate/](./migrate/) — 数据库迁移脚本 + +## 许可证 + +[待定] diff --git a/docs/rpc.md b/docs/rpc.md new file mode 100644 index 0000000..eb78ba7 --- /dev/null +++ b/docs/rpc.md @@ -0,0 +1,718 @@ +# Auth 认证方案 + +## 架构总览 + +``` +┌─────────┐ ┌──────────┐ ┌──────────┐ +│ Client │ │ appks │ │ imks │ +│ (浏览器/ │ │ (core) │ │ (IM服务) │ +│ APP) │ │ │ │ │ +└────┬─────┘ └────┬─────┘ └────┬─────┘ + │ │ │ + │ 1. POST /api/v1/auth/login │ + │───────────────────▶│ │ + │ 2. {access_token, refresh_token} │ + │◀───────────────────│ │ + │ │ │ + │ 3. WS/gRPC/HTTP 携带 JWT │ + │──────────────────────────────────────▶│ + │ │ │ + │ │ 4a. VerifyToken RPC (RPC模式) + │ │◀─────────────────│ + │ │ 4b. GetSigningKeys (本地模式) + │ │◀─────────────────│ + │ │ │ + │ │ 5. TokenClaims / SigningKeys + │ │─────────────────▶│ + │ │ │ + │ 6. 业务响应 │ │ + │◀─────────────────────────────────────│ +``` + +**角色分工:** + +| 服务 | 职责 | +|------------------|----------------------------------------------------| +| **appks** (core) | 颁发 JWT、刷新 JWT、撤销 JWT、管理签名密钥、提供 `TokenService` gRPC | +| **imks** (IM) | 接收客户端 JWT,通过 RPC 或本地密钥验证用户身份 | + +## Proto 契约 + +定义在 `proto/core/auth.proto`,package `appks.core.v1`。 + +appks 和 imks 各自维护一份相同的 proto 文件: +- appks 编译为 **server** stub(提供服务) +- imks 编译为 **client** stub(调用服务) + +### TokenService RPC + +```protobuf +service TokenService { + // 令牌生命周期 (appks 内部调用) + rpc IssueToken(IssueTokenRequest) returns (IssueTokenResponse); + rpc RefreshToken(RefreshTokenRequest) returns (RefreshTokenResponse); + rpc RevokeToken(RevokeTokenRequest) returns (RevokeTokenResponse); + + // imks 验证 (RPC 模式) + rpc VerifyToken(VerifyTokenRequest) returns (VerifyTokenResponse); + + // imks 密钥拉取 (本地验证模式) + rpc GetSigningKeys(GetSigningKeysRequest) returns (GetSigningKeysResponse); +} +``` + +## JWT 令牌 + +### 结构 + +JWT Header: +```json +{ + "alg": "HS256", + "typ": "JWT", + "kid": "01909a..." // 签名密钥 ID,用于匹配 SigningKey +} +``` + +JWT Payload (`TokenClaims`): +```json +{ + "sub": "user-uuid", + "iss": "appks", + "iat": 1718000000, + "exp": 1718003600, + "jti": "01909b...", + "scope": "im:read im:write", + "extra": { + "workspace_id": "..." + } +} +``` + +### 令牌类型 + +| 类型 | 格式 | 存储 | 用途 | +|-------------------|----------------------|----------------------------------------------|---------------------------------------| +| **access_token** | JWT (HS256) | 无状态,客户端持有 | 每次请求携带,验证用户身份 | +| **refresh_token** | `rt_{UUIDv7}` 不透明字符串 | Redis `core:token:refresh:{token}` → user_id | 换取新的 access_token + refresh_token(旋转) | + +## 双模式验证 + +imks 可选择以下任一模式验证客户端 JWT: + +### 模式 A:RPC 验证(`VerifyToken`) + +``` +imks → appks TokenService.VerifyToken(jwt) → {valid, claims} +``` + +- **优点**:实时权威,能感知撤销 +- **缺点**:每次请求增加一次 RPC 往返 +- **适用场景**:高安全要求操作(管理员操作、敏感数据) + +### 模式 B:本地验证(`GetSigningKeys`) + +``` +imks 启动时 → appks TokenService.GetSigningKeys() → 缓存密钥到本地 +后续请求 → imks 用本地密钥解码 JWT(HS256 验签) +定期刷新 → 根据 next_rotation_at 拉取新密钥 +``` + +- **优点**:零 RPC 延迟,appks 不可用时仍能验证 +- **缺点**:撤销有最多一个密钥窗口(3h)的延迟 +- **适用场景**:高频低延迟操作(消息收发、实时通信) + +### 推荐策略 + +混合使用: +- 普通操作(发消息、读频道)→ 本地验证 +- 敏感操作(踢人、删频道、改权限)→ RPC 验证 + +## 签名密钥管理 + +### 密钥窗口 + +``` +时间轴: + ─────────┬──────────┬──────────┬──────── + │ key A │ key B │ key C + │ (过期) │ (活跃) │ (未来) + └──────────┴──────────┴──────── + issued_at issued_at issued_at + +3h +3h +3h +``` + +- 每个签名密钥有效期 **3 小时** +- 同一时刻可能有 **2 个有效密钥**(滚动窗口,平滑过渡) +- JWT header 的 `kid` 字段标识使用哪个密钥签名 + +### 密钥轮换流程 + +``` +1. 当前密钥到达 3h → TokenService.rotate_if_needed() +2. Redis 分布式锁 (core:token:rotation_lock, 10s TTL) 防止多实例竞争 +3. 旧密钥标记 active=false,仍保留在 Redis 用于验证旧 token +4. 生成新密钥,active=true +5. ArcSwap 原子替换当前签名密钥 +6. 旧密钥 TTL = 6h (2× window) 后从 Redis 自动清除 +``` + +### 密钥存储(Redis) + +``` +core:token:active_key → kid (当前活跃密钥 ID) +core:token:key:{kid} → SigningKey JSON (TTL = 6h) +core:token:rotation_lock → "1" (TTL = 10s, 分布式锁) +``` + +### SigningKey 结构 + +```rust +pub struct SigningKeyInfo { + pub kid: String, // UUIDv7 + pub algorithm: String, // "HS256" + pub key_material: String, // base64(32 bytes random) + pub issued_at: i64, + pub expires_at: i64, // issued_at + 3h + pub active: bool, +} +``` + +## 撤销机制 + +### Redis 布局 + +``` +core:token:revoked:{jti} → "1" (TTL = token 剩余有效期) +core:token:refresh:{token} → user_id (TTL = 7d) +``` + +### 撤销方式 + +| 操作 | RPC | 效果 | +|--------------|------------------------|-----------------------| +| 撤销单个 token | `RevokeToken(jti)` | 将 jti 加入撤销列表 | +| 撤销用户所有 token | `RevokeToken(user_id)` | 删除该用户所有 refresh token | + +### 撤销感知延迟 + +| 验证模式 | 延迟 | +|-----------------------|------------------------------| +| RPC (`VerifyToken`) | **实时** — 每次检查撤销列表 | +| 本地 (`GetSigningKeys`) | **最多 3h** — 密钥过期前无法感知 jti 撤销 | + +## appks 实现 + +### 模块结构 + +``` +service/internal_auth.rs → TokenService (业务逻辑) +grpc/auth.rs → TokenGrpcService (gRPC handler) +grpc/mod.rs → TokenServiceServer 注册到 tonic server +api/internal/issue_api_key.rs → REST: POST /api/v1/internal/tokens +``` + +### TokenService 核心 + +```rust +pub struct TokenService { + redis: AppRedis, + current_key: Arc>, // 无锁读 +} +``` + +- 启动时从 Redis 加载活跃密钥,无则生成 +- 签名使用 `jsonwebtoken` crate (HS256) +- 密钥轮换使用 Redis 分布式锁,支持多实例部署 +- `ArcSwap` 保证签名密钥读取无锁、写入原子 + +## imks 实现指南 + +### 启动流程 + +```rust +// 1. 连接 appks TokenService +let mut token_client = TokenServiceClient::connect(appks_addr).await?; + +// 2. 拉取签名密钥 +let resp = token_client.get_signing_keys(GetSigningKeysRequest { kid: "" }).await?; +let keys = resp.keys; +let next_rotation = resp.next_rotation_at; + +// 3. 缓存密钥到本地 (HashMap) +key_store.insert_all(keys); + +// 4. 安排定时刷新 +tokio::spawn(async move { + loop { + let delay = next_rotation - now(); + tokio::time::sleep(Duration::from_secs(delay as u64)).await; + let resp = token_client.get_signing_keys(...).await; + key_store.update(resp.keys); + } +}); +``` + +### 连接时验证 + +```rust +// 客户端建立 WebSocket/gRPC 连接时携带 JWT +fn on_connect(headers: &Headers) -> Result { + let token = headers.get("Authorization") + .and_then(|v| v.strip_prefix("Bearer ")) + .ok_or(AuthError::MissingToken)?; + + // 本地验证 (快速路径) + let header = decode_header(token)?; + let kid = header.kid.ok_or(AuthError::MissingKid)?; + let key = key_store.get(&kid).ok_or(AuthError::UnknownKey)?; + + let mut validation = Validation::new(Algorithm::HS256); + validation.set_issuer(&["appks"]); + validation.validate_exp = true; + + let data = decode::(token, &key, &validation)?; + Ok(data.claims) +} +``` + +### 敏感操作验证 + +```rust +// 敏感操作走 RPC 验证 (权威路径) +async fn on_sensitive_action(token: &str) -> Result { + let resp = token_client.verify_token(VerifyTokenRequest { + token: token.to_string(), + }).await?; + + if resp.valid { + Ok(resp.claims.unwrap()) + } else { + Err(AuthError::from(resp.reason)) + } +} +``` + +## 安全考虑 + +1. **密钥传输**:appks → imks 的 gRPC 连接应使用 mTLS,防止密钥在传输中被截获 +2. **密钥生命周期**:3h 窗口平衡了安全性和可用性;缩短窗口可减少撤销延迟但增加轮换频率 +3. **HS256 vs 非对称**:当前使用 HS256(对称密钥),imks 拿到的密钥可以伪造 token。如果 imks 不可完全信任,应改用 RS256/EdDSA,imks 只持有公钥 +4. **Refresh Token 安全**:每次刷新都旋转(旧 token 立即失效),防止重放 +5. **撤销列表 TTL**:与 token 剩余有效期对齐,过期 token 无需保留撤销记录 + +--- + +# IM 服务 Proto 说明书 + +以下是 imks 侧 `proto/core/` 下各 gRPC 服务的完整说明。所有 IM 服务定义在 `appks.im.v1` 包下,由 appks 提供 server 端,imks 消费 client 端。 + +## 服务总览 + +| Proto 文件 | 服务 | RPC 数量 | 职责 | +|---|---|---|---| +| `auth.proto` | TokenService | 5 | JWT 令牌生命周期 + 验证 + 密钥分发 | +| `channel.proto` | ChannelService | 10 | 频道/分类 CRUD + 统计 | +| `member.proto` | MemberService | 7 | 成员邀请/踢出/加入/离开/查询 | +| `permission.proto` | PermissionService | 7 | 权限检查 + 覆盖规则 + 频道解析 | +| `channel_settings.proto` | ChannelRoleService | 4 | 频道自定义角色 | +| | ChannelInvitationService | 4 | 邀请生命周期 | +| | ChannelWebhookService | 4 | Webhook CRUD | +| | ChannelSlashCommandService | 4 | 斜杠命令注册 | +| | ChannelRepoLinkService | 3 | 频道 ↔ 代码仓库关联 | +| | ImIntegrationService | 4 | 外部平台集成(Slack/Discord 等) | +| | CustomEmojiService | 3 | 工作区自定义表情 | +| | ForumTagService | 4 | 论坛频道标签 | +| | VoiceService | 2 | 语音频道参与者状态 | +| | StageService | 4 | 舞台频道管理 | +| | ChannelAuditService | 1 | 频道审计日志查询 | + +--- + +## ChannelService(`channel.proto`) + +频道和分类的 CRUD 管理,以及频道统计。 + +### 枚举 + +**ChannelType** — 频道类型: + +| 值 | 含义 | +|---|---| +| `PUBLIC` | 公开频道,workspace 内所有人可见 | +| `PRIVATE` | 私有频道,仅被邀请成员可见 | +| `DIRECT` | 私聊(一对一) | +| `GROUP` | 群聊(多人私聊) | +| `REPO` | 仓库关联频道(自动与 git repo 绑定) | +| `SYSTEM` | 系统频道(公告、通知等,只读) | + +**ChannelKind** — 频道形态: + +| 值 | 含义 | +|---|---| +| `TEXT` | 文本频道 | +| `VOICE` | 语音频道 | +| `STAGE` | 舞台频道(主持人+观众模式) | +| `FORUM` | 论坛频道(帖子/主题式讨论) | +| `ANNOUNCEMENT` | 公告频道(仅管理员可发消息) | + +**Visibility** — 可见性级别(从低到高): + +| 值 | 含义 | +|---|---| +| `PUBLIC` | 所有人可见(含未登录用户) | +| `WORKSPACE` | workspace 成员可见 | +| `INTERNAL` | 内部可见(组织成员) | +| `PRIVATE` | 仅频道成员可见 | +| `PROTECTED` | 受保护(不可被搜索/索引) | +| `HIDDEN` | 隐藏(不显示在频道列表中) | +| `SECRET` | 机密(仅通过直链访问) | + +### RPC 列表 + +``` +GetChannel(channel_id) → Channel 获取频道详情 +ListChannels(workspace, ...) → [Channel], total 列出频道(支持分类/类型/形态过滤) +CreateChannel(workspace, name) → Channel 创建频道 +UpdateChannel(channel_id, ...) → Channel 更新频道属性 +DeleteChannel(channel_id) → {} 删除频道 +GetChannelStats(channel_id) → ChannelStats 获取频道统计(成员/消息/线程/反应数) + +ListCategories(workspace) → [ChannelCategory] 列出分类 +CreateCategory(workspace, name) → ChannelCategory 创建分类 +UpdateCategory(category_id, ...) → ChannelCategory 更新分类 +DeleteCategory(category_id) → {} 删除分类 +``` + +### 核心消息 + +**Channel** — 频道主体: + +| 字段 | 类型 | 说明 | +|---|---|---| +| `id` | UUID | 频道 ID | +| `workspace_id` | UUID | 所属 workspace | +| `category_id` | UUID? | 所属分类(可选) | +| `parent_channel_id` | UUID? | 父频道(用于子频道/线程) | +| `name` | string | 频道名称 | +| `topic` / `description` | string? | 主题 / 描述 | +| `channel_type` | ChannelType | 频道类型 | +| `channel_kind` | ChannelKind | 频道形态 | +| `visibility` | Visibility | 可见性 | +| `position` | int32 | 排序位置 | +| `nsfw` | bool | NSFW 标记 | +| `read_only` | bool | 只读(仅管理员可发消息) | +| `archived` | bool | 已归档 | +| `rate_limit_per_user` | int32? | 慢速模式(秒/消息) | +| `last_message_id` / `last_message_at` | — | 最后一条消息信息 | + +**ChannelStats** — 频道统计: + +| 字段 | 说明 | +|---|---| +| `members_count` | 成员数 | +| `messages_count` | 消息数 | +| `threads_count` | 线程数 | +| `reactions_count` | 反应数 | +| `mentions_count` | @提及数 | +| `files_count` | 文件数 | +| `last_activity_at` | 最后活跃时间 | + +--- + +## MemberService(`member.proto`) + +频道成员管理。 + +### 枚举 + +**Role** — 角色层级(从高到低): + +| 值 | 含义 | +|---|---| +| `OWNER` | 频道所有者 | +| `ADMIN` | 管理员(全部权限) | +| `MAINTAINER` | 维护者(管理频道设置、成员) | +| `MODERATOR` | 版主(管理消息、踢人) | +| `MEMBER` | 普通成员 | +| `CONTRIBUTOR` | 贡献者(可发消息,部分限制) | +| `VIEWER` | 观察者(只读) | +| `GUEST` | 访客(临时访问) | +| `BOT` | 机器人 | + +**MemberStatus** — 成员状态: + +| 值 | 含义 | +|---|---| +| `ACTIVE` | 活跃成员 | +| `INVITED` | 已邀请(尚未加入) | +| `LEFT` | 已离开 | +| `KICKED` | 被踢出 | +| `BANNED` | 被封禁 | + +### RPC 列表 + +``` +ListMembers(channel_id, ...) → [ChannelMember], total 列出成员(支持状态过滤) +InviteMember(channel_id, user_id) → ChannelMember 邀请用户加入频道 +UpdateMember(channel_id, user_id) → ChannelMember 更新成员(角色/禁言/置顶) +KickMember(channel_id, user_id) → {} 踢出成员 +JoinChannel(channel_id, user_id) → ChannelMember 用户主动加入 +LeaveChannel(channel_id, user_id) → {} 用户主动离开 +IsMember(channel_id, user_id) → is_member, role 检查是否为成员 +``` + +### 核心消息 + +**ChannelMember** — 频道成员: + +| 字段 | 说明 | +|---|---| +| `channel_id` / `user_id` | 频道 + 用户 | +| `role` | 角色(Role 枚举值字符串) | +| `status` | 状态(MemberStatus 枚举值字符串) | +| `muted` | 是否被禁言 | +| `pinned` | 是否被置顶(频道侧标记) | +| `last_read_message_id` / `last_read_at` | 已读进度 | +| `joined_at` / `left_at` | 加入/离开时间 | + +--- + +## PermissionService(`permission.proto`) + +频道级权限系统,独立于 workspace/repo 的通用权限。 + +### 权限枚举(ImPermission) + +| 权限 | 说明 | +|---|---| +| `READ_CHANNEL` | 查看频道 | +| `SEND_MESSAGE` | 发送消息 | +| `MANAGE_THREADS` | 管理线程 | +| `MANAGE_REACTIONS` | 管理反应 | +| `MANAGE_PINS` | 管理置顶消息 | +| `INVITE_MEMBERS` | 邀请成员 | +| `KICK_MEMBERS` | 踢出成员 | +| `MANAGE_CHANNEL` | 管理频道设置 | +| `MANAGE_ROLES` | 管理角色 | +| `MANAGE_WEBHOOKS` | 管理 Webhook | +| `MANAGE_EMOJIS` | 管理自定义表情 | +| `VIEW_AUDIT_LOG` | 查看审计日志 | +| `MANAGE_INTEGRATIONS` | 管理外部集成 | +| `SEND_TTS` | 发送 TTS 消息 | +| `USE_SLASH_COMMANDS` | 使用斜杠命令 | +| `ATTACH_FILES` | 上传文件 | +| `MENTION_EVERYONE` | @所有人 | +| `MANAGE_MESSAGES` | 管理消息(删除他人消息) | +| `ADMIN` | 管理员(拥有所有权限) | + +### RPC 列表 + +``` +CheckPermission(channel, user, perm) → allowed, role 检查单项权限 +GetPermissions(channel, user) → [ImPermission] 获取用户全部权限 +SetPermissionOverwrite(channel, target) → Overwrite 设置权限覆盖 +GetPermissionOverwrites(channel) → [Overwrite] 获取覆盖列表 +DeletePermissionOverwrite(channel, target) → {} 删除覆盖 + +ResolveChannel(channel_id) → 频道摘要信息 解析频道元数据 +EnsureReadable(channel, user) → allowed 确保用户可读(快速检查) +``` + +### 权限覆盖(PermissionOverwrite) + +权限覆盖允许对特定用户/角色在特定频道上覆盖默认权限: + +| 字段 | 说明 | +|---|---| +| `target_type` | `"user"` 或 `"role"` | +| `target_id` | 用户 ID 或角色 ID | +| `allow` | 显式允许的权限列表 | +| `deny` | 显式拒绝的权限列表 | + +权限解析优先级:`deny 覆盖 > allow 覆盖 > 角色权限` + +--- + +## ChannelSettings 服务组(`channel_settings.proto`) + +所有频道配置相关的服务定义在同一个 proto 文件中。 + +### ChannelRoleService — 频道自定义角色 + +频道级别的自定义角色(不同于 `member.proto` 中的全局 Role 枚举)。 + +``` +ListChannelRoles(channel_id) → [ChannelRole] +CreateChannelRole(channel, name) → ChannelRole +UpdateChannelRole(role_id, ...) → ChannelRole +DeleteChannelRole(role_id) → {} +``` + +**ChannelRole** 字段:`name`, `permissions[]`(ImPermission 字符串列表), `assignable`(是否可被普通成员分配) + +### ChannelInvitationService — 邀请管理 + +``` +ListInvitations(channel_id) → [ChannelInvitation] +CreateInvitation(channel, user) → ChannelInvitation +AcceptInvitation(invitation_id) → ChannelInvitation +RevokeInvitation(invitation_id) → {} +``` + +**ChannelInvitation** 字段:`invited_by`, `invited_user_id`, `role`(预设角色), `status` + +### ChannelWebhookService — Webhook 管理 + +``` +ListWebhooks(channel_id) → [ChannelWebhook] +CreateWebhook(channel, name, url) → ChannelWebhook +UpdateWebhook(webhook_id, ...) → ChannelWebhook +DeleteWebhook(webhook_id) → {} +``` + +**ChannelWebhook** 字段:`name`, `url`, `secret`(签名验证用), `events[]`(订阅事件列表), `active` + +### ChannelSlashCommandService — 斜杠命令注册 + +``` +ListSlashCommands(channel_id) → [ChannelSlashCommand] +CreateSlashCommand(channel, cmd, url) → ChannelSlashCommand +UpdateSlashCommand(command_id, ...) → ChannelSlashCommand +DeleteSlashCommand(command_id) → {} +``` + +**ChannelSlashCommand** 字段:`command`(命令名如 `/deploy`), `description`, `request_url`(回调地址), `scopes[]` + +### ChannelRepoLinkService — 仓库关联 + +将频道与代码仓库关联,自动推送仓库事件到频道。 + +``` +ListRepoLinks(channel_id) → [ChannelRepoLink] +CreateRepoLink(channel, repo, type) → ChannelRepoLink +DeleteRepoLink(link_id) → {} +``` + +**ChannelRepoLink** 字段:`repo_id`, `link_type`, `events[]`(订阅的仓库事件:push、pr、issue 等) + +### ImIntegrationService — 外部平台集成 + +与 Slack、Discord 等外部平台的消息同步。 + +``` +ListIntegrations(channel_id) → [ImIntegration] +CreateIntegration(channel, provider, ...)→ ImIntegration +UpdateIntegration(integration_id, ...) → ImIntegration +DeleteIntegration(integration_id) → {} +``` + +**ImIntegration** 字段:`provider`(平台名), `external_channel_id`(外部频道 ID), `sync_direction`(`inbound`/`outbound`/`bidirectional`), `active` + +### CustomEmojiService — 自定义表情 + +工作区级别的自定义表情管理。 + +``` +ListCustomEmojis(workspace_id) → [CustomEmoji] +CreateCustomEmoji(workspace, name, url) → CustomEmoji +DeleteCustomEmoji(emoji_id) → {} +``` + +**CustomEmoji** 字段:`workspace_id`, `name`(表情名如 `:appks:`), `image_url` + +### ForumTagService — 论坛标签 + +论坛频道(`ChannelKind::FORUM`)的帖子分类标签。 + +``` +ListForumTags(channel_id) → [ForumTag] +CreateForumTag(channel, name, ...) → ForumTag +UpdateForumTag(tag_id, ...) → ForumTag +DeleteForumTag(tag_id) → {} +``` + +**ForumTag** 字段:`name`, `moderated`(是否需要管理员审核), `position` + +### VoiceService — 语音频道 + +语音频道的参与者状态管理。 + +``` +ListVoiceParticipants(channel_id) → [VoiceParticipant] +UpdateVoiceState(channel, user, ...) → VoiceParticipant +``` + +**VoiceParticipant** 字段:`user_id`, `muted`(静音), `deafened`(屏蔽音频), `joined_at` + +### StageService — 舞台频道 + +舞台频道(`ChannelKind::STAGE`)的管理。主持人说话,观众收听。 + +``` +GetStage(channel_id) → Stage +CreateStage(channel, topic, ...) → Stage +UpdateStage(stage_id, ...) → Stage +DeleteStage(stage_id) → {} +``` + +**Stage** 字段:`topic`(当前话题), `privacy_level`, `discoverable`(是否可被发现), `started_at` / `ended_at` + +### ChannelAuditService — 审计日志 + +频道操作审计日志查询(只读)。 + +``` +ListChannelEvents(channel_id, ...) → [ChannelAuditEvent], total +``` + +**ChannelAuditEvent** 字段:`actor_id`(操作者), `event_type`(事件类型字符串), `target_type` / `target_id`(操作对象), `old_value` / `new_value`(变更前后值) + +--- + +## imks 与 appks 的调用关系 + +``` +┌────────────────────────────────────────────────────────────┐ +│ imks │ +│ │ +│ Socket.IO / WebSocket / WebTransport │ +│ │ │ +│ ▼ │ +│ 连接握手 ──→ TokenService.VerifyToken() 或 本地密钥验证 │ +│ │ │ +│ ▼ │ +│ 消息收发 ──→ ChannelService + MemberService │ +│ │ PermissionService.EnsureReadable() │ +│ │ │ +│ ▼ │ +│ 频道管理 ──→ ChannelService CRUD │ +│ │ ChannelRoleService │ +│ │ ChannelInvitationService │ +│ │ │ +│ ▼ │ +│ 语音/舞台 ──→ VoiceService + StageService │ +│ │ +│ 集成/扩展 ──→ WebhookService + SlashCommandService │ +│ RepoLinkService + ImIntegrationService │ +│ │ +│ 审计查询 ──→ ChannelAuditService │ +└────────────────────────┬───────────────────────────────────┘ + │ gRPC + ▼ +┌────────────────────────────────────────────────────────────┐ +│ appks │ +│ TokenService server │ Channel/Member/Permission server │ +│ Redis (JWT keys) │ Postgres (channel data) │ +└────────────────────────────────────────────────────────────┘ +``` + +### imks 本地缓存建议 + +| 数据 | 缓存策略 | 刷新时机 | +|--------------------------|--------------|--------------------------| +| 签名密钥 (`SigningKey[]`) | 内存 HashMap | `next_rotation_at` 到达时拉取 | +| 频道信息 (`Channel`) | LRU + TTL | 频道更新事件 (NATS) | +| 成员列表 (`ChannelMember[]`) | LRU + TTL | 成员变更事件 (NATS) | +| 权限缓存 | 短期 TTL (30s) | 权限变更事件 (NATS) | +| 自定义表情 | 全量加载 + 事件增量 | emoji 增删事件 (NATS) | diff --git a/engine/health.rs b/engine/health.rs index 639a7dc..e3b1a70 100644 --- a/engine/health.rs +++ b/engine/health.rs @@ -1,26 +1,32 @@ //! Health check endpoint for the imks server. //! -//! Returns JSON with server status, version, and upstream connectivity. +//! Returns JSON with server status, version, uptime, and connection counts +//! sourced from live runtime state (session store + atomic counter). -use actix_web::HttpResponse; +use actix_web::{HttpResponse, web}; use serde::Serialize; +use crate::engine::session::SessionStore; +use crate::telemetry; + #[derive(Serialize)] struct HealthResponse { status: String, version: String, timestamp: String, uptime_secs: u64, + connections_active: u64, sessions_count: usize, } -/// GET /health — returns server health status. -pub async fn health_check() -> HttpResponse { +/// GET /health — returns server health status with live connection metrics. +pub async fn health_check(store: web::Data) -> HttpResponse { HttpResponse::Ok().json(HealthResponse { status: "healthy".into(), version: env!("CARGO_PKG_VERSION").into(), timestamp: chrono::Utc::now().to_rfc3339(), - uptime_secs: 0, - sessions_count: 0, + uptime_secs: telemetry::health::uptime_secs(), + connections_active: telemetry::health::connections_active_count(), + sessions_count: store.len(), }) } diff --git a/engine/server.rs b/engine/server.rs index 8faedbf..d382bdd 100644 --- a/engine/server.rs +++ b/engine/server.rs @@ -115,17 +115,26 @@ impl EngineServer { )); let heartbeat_handle = heartbeat.start(); - tracing::info!("Engine.IO HTTP server listening on {}", addr); + tracing::info!( + endpoint = %addr, + "Engine.IO HTTP server listening, /health and /metrics available" + ); let result = HttpServer::new(move || { App::new() .app_data(web::Data::new(store.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(on_message.clone())) + // Health check with connection metrics .route( "/health", web::get().to(crate::engine::health::health_check), ) + // Prometheus metrics endpoint + .route( + "/metrics", + web::get().to(crate::telemetry::metrics::metrics_handler), + ) .route("/engine.io/", web::get().to(engine_get)) .route( "/engine.io/", diff --git a/engine/session.rs b/engine/session.rs index d9d21ae..706d2b0 100644 --- a/engine/session.rs +++ b/engine/session.rs @@ -129,6 +129,12 @@ impl SessionStore { sid ); } + if let Some(m) = crate::telemetry::metrics::try_get() { + m.engine_sessions_active.add( + 1, + &[opentelemetry::KeyValue::new("transport", transport.as_str())], + ); + } rx } @@ -137,7 +143,11 @@ impl SessionStore { } pub fn remove(&self, sid: &str) { - self.sessions.remove(sid); + if self.sessions.remove(sid).is_some() + && let Some(m) = crate::telemetry::metrics::try_get() + { + m.engine_sessions_active.add(-1, &[]); + } } pub fn exists(&self, sid: &str) -> bool { diff --git a/lib.rs b/lib.rs index 1e85346..9606646 100644 --- a/lib.rs +++ b/lib.rs @@ -8,5 +8,6 @@ pub mod repo; pub mod rpc; pub mod socket; pub mod svc; +pub mod telemetry; pub use error::{ImksError, ImksResult}; diff --git a/main.rs b/main.rs index abfb370..42fe39f 100644 --- a/main.rs +++ b/main.rs @@ -9,14 +9,12 @@ use imks::socket::message_bus::{NatsMessageBus, RedisMessageBus}; use imks::socket::server::SocketServerBuilder; use imks::svc::{DeployConfig, MessageService}; +use imks::telemetry; fn main() -> Result<(), Box> { - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), - ) - .init(); + // Initialize observability stack (traces, metrics, logs, health) + let telemetry_guard = telemetry::init(); + telemetry::health::init_counters(); let deploy = DeployConfig::from_env(); tracing::info!( @@ -37,7 +35,6 @@ fn main() -> Result<(), Box> { Arc::new(OnceLock::new()); // Pre-configure adapter for Redis/NATS mode. - // The callback resolves namespaces after SocketServer is built. match deploy.adapter_mode.as_str() { "redis" => { let message_bus = Arc::new( @@ -130,27 +127,58 @@ fn main() -> Result<(), Box> { .map_err(|e| e.to_string())?; } + // Increment connection metrics + let m = telemetry::metrics::get(); + m.connections_active.add( + 1, + &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + m.connections_total.add( + 1, + &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + telemetry::health::connection_connected(); + tracing::info!( - "Socket {} connected (engine: {})", - socket.sid, - socket.engine_sid + socket_sid = %socket.sid, + engine_sid = %socket.engine_sid, + namespace = %socket.namespace, + "Socket connected" ); Ok(()) }) .await; + + // Register Socket.IO event handlers if let Some(ref svc) = service { macro_rules! register_event { ($svc:expr, $ns:expr, $event:expr, $method:ident) => { let s = $svc.clone(); + let event_name = $event.to_string(); $ns.on_event($event, Arc::new(move |socket, data| { let s = s.clone(); let data = data.clone(); + let event = event_name.clone(); tokio::spawn(async move { + let _span = tracing::info_span!( + "socket_event", + otel.name = format!("handle {event}"), + event = %event, + socket_sid = %socket.sid, + ); + let _enter = _span.enter(); + + let start = std::time::Instant::now(); if let Err(e) = s.$method(socket, &data).await { - tracing::error!(event = $event, error = %e, "Event handler failed"); + tracing::error!(event = %event, error = %e, "Event handler failed"); } + let elapsed = start.elapsed().as_secs_f64(); + telemetry::metrics::get().event_handling_duration.record( + elapsed, + &telemetry::MetricsInstruments::event_attrs(&event), + ); }); })).await; }; @@ -200,11 +228,12 @@ fn main() -> Result<(), Box> { register_event!(svc, namespace, "article:list", list_articles); register_event!(svc, namespace, "article:delete", delete_article); register_event!(svc, namespace, "component:interact", interact_component); + register_event!(svc, namespace, "component:update", update_component); // Start scheduled message dispatcher (background task) svc.clone().start_scheduled_dispatcher(); - tracing::info!("Registered Socket.IO event handlers"); + tracing::info!("Registered Socket.IO event handlers with observability instrumentation"); } // Start servers @@ -233,6 +262,9 @@ fn main() -> Result<(), Box> { Ok::<(), Box>(()) })?; + // Graceful telemetry shutdown + telemetry_guard.shutdown(); + Ok(()) } diff --git a/socket/namespace.rs b/socket/namespace.rs index 71e2435..42a393a 100644 --- a/socket/namespace.rs +++ b/socket/namespace.rs @@ -75,7 +75,10 @@ impl Namespace { } /// Remove a socket by its socket SID. - pub async fn remove_socket_by_sid(&self, socket_sid: &str) { + /// + /// Returns `true` if a socket was actually removed, `false` if the SID + /// was not found (already removed or never existed). + pub async fn remove_socket_by_sid(&self, socket_sid: &str) -> bool { if let Some((_, socket)) = self.sockets.remove(socket_sid) { self.engine_to_socket.remove(&socket.engine_sid); self.remove_socket_from_local_rooms(socket_sid); @@ -86,14 +89,19 @@ impl Namespace { { tracing::warn!("Adapter del_all error for socket {}: {}", socket_sid, e); } + true + } else { + false } } /// Remove a socket by its engine SID (for engine-level disconnections). - pub async fn remove_socket(&self, engine_sid: &str) { + /// Returns `true` if a socket was actually removed. + pub async fn remove_socket(&self, engine_sid: &str) -> bool { if let Some((_, socket_sid)) = self.engine_to_socket.remove(engine_sid) { - self.remove_socket_by_sid(&socket_sid).await; + return self.remove_socket_by_sid(&socket_sid).await; } + false } /// Look up a socket by its socket SID. diff --git a/socket/server.rs b/socket/server.rs index fcb53f7..444f7c6 100644 --- a/socket/server.rs +++ b/socket/server.rs @@ -143,29 +143,40 @@ async fn handle_engine_message( ) { if let EnginePacketData::Text(ref text) = engine_packet.data { match parser::decode(text) { - Ok(socket_packet) => match socket_packet.packet_type { - PacketType::Connect => { - handle_connect( - &engine_sid, - &socket_packet, - namespaces, - socket_txs, - engine_store, - adapter, - ) - .await; + Ok(socket_packet) => { + let packet_type = format!("{:?}", socket_packet.packet_type); + let _span = tracing::debug_span!( + "engine_message", + engine_sid = %engine_sid, + packet_type = %packet_type, + namespace = %socket_packet.namespace, + ); + let _enter = _span.enter(); + + match socket_packet.packet_type { + PacketType::Connect => { + handle_connect( + &engine_sid, + &socket_packet, + namespaces, + socket_txs, + engine_store, + adapter, + ) + .await; + } + PacketType::Disconnect => { + handle_disconnect(&engine_sid, &socket_packet, namespaces, socket_txs); + } + PacketType::Event => { + handle_event(&engine_sid, &socket_packet, namespaces); + } + PacketType::Ack => { + handle_ack(&engine_sid, &socket_packet); + } + _ => {} } - PacketType::Disconnect => { - handle_disconnect(&engine_sid, &socket_packet, namespaces, socket_txs); - } - PacketType::Event => { - handle_event(&engine_sid, &socket_packet, namespaces); - } - PacketType::Ack => { - handle_ack(&engine_sid, &socket_packet); - } - _ => {} - }, + } Err(e) => { tracing::warn!(engine_sid = %engine_sid, error = %e, "Invalid Socket.IO packet"); } @@ -181,6 +192,13 @@ async fn handle_connect( engine_store: &SessionStore, adapter: &Arc, ) { + let _span = tracing::info_span!( + "socket_connect", + engine_sid = %engine_sid, + namespace = %packet.namespace, + ); + let _enter = _span.enter(); + // Validate namespace path to prevent DoS via arbitrary namespace creation if !crate::socket::namespace::is_valid_namespace(&packet.namespace) { tracing::warn!( @@ -244,11 +262,16 @@ async fn handle_connect( break; } } - // Forwarding task ended — ensure socket is cleaned up from namespace + // Forwarding task ended — ensure socket is cleaned up from namespace. + // If the socket was still registered (session expiry / engine disconnect + // without Socket.IO disconnect packet), also update the connection counter. socket_txs_clone.remove(&socket_sid_clone); - namespace_clone + let was_removed = namespace_clone .remove_socket_by_sid(&socket_sid_clone) .await; + if was_removed { + crate::telemetry::health::connection_disconnected(); + } }); // Send Connect response (only after handler passed) @@ -268,16 +291,26 @@ fn handle_disconnect( namespaces: &Arc, socket_txs: &Arc>>, ) { - if let Some(namespace) = namespaces.get_namespace(&packet.namespace) { - // Look up socket by engine_sid, then remove by socket_sid - if let Some(socket) = namespace.get_socket_by_engine_sid(engine_sid) { - socket_txs.remove(&socket.sid); - let socket_sid = socket.sid.clone(); - let ns_clone = namespace.clone(); - tokio::spawn(async move { - ns_clone.remove_socket_by_sid(&socket_sid).await; - }); - } + if let Some(namespace) = namespaces.get_namespace(&packet.namespace) + && let Some(socket) = namespace.get_socket_by_engine_sid(engine_sid) + { + let m = crate::telemetry::metrics::get(); + m.connections_active.add( + -1, + &crate::telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + m.disconnections_total.add( + 1, + &crate::telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + crate::telemetry::health::connection_disconnected(); + + socket_txs.remove(&socket.sid); + let socket_sid = socket.sid.clone(); + let ns_clone = namespace.clone(); + tokio::spawn(async move { + ns_clone.remove_socket_by_sid(&socket_sid).await; + }); } } diff --git a/svc/component.rs b/svc/component.rs index a582ae6..f65f7e5 100644 --- a/svc/component.rs +++ b/svc/component.rs @@ -64,7 +64,6 @@ impl MessageService { } /// Handle `component:update` — update a component's state (e.g., disable after interaction). - #[allow(dead_code)] pub async fn update_component( &self, socket: Arc, diff --git a/svc/message.rs b/svc/message.rs index 5a9bb93..4402427 100644 --- a/svc/message.rs +++ b/svc/message.rs @@ -296,6 +296,43 @@ impl MessageService { Ok(()) } + /// Handle `message:edit_history` — retrieve the edit history for a message. + pub async fn get_edit_history( + &self, + socket: Arc, + data: &serde_json::Value, + ) -> ImksResult<()> { + let user_id = self.user_id(&socket)?; + let payload = Self::first_payload(data)?; + let message_id: Uuid = Self::parse_field(payload, "message_id")?; + + let message = self + .repo + .get(message_id) + .await? + .ok_or_else(|| ImksError::NotFound(format!("message {message_id}")))?; + + let channel_id_str = message.channel_id.to_string(); + let user_id_str = user_id.to_string(); + + self.ensure_readable(&channel_id_str, &user_id_str).await?; + + let history = self.repo.get_edit_history(message_id).await?; + let summary = self.repo.get_edit_summary(message_id).await?; + + let _ = socket.emit( + "message:edit_history", + serde_json::json!({ + "message_id": message_id.to_string(), + "edits": history, + "edit_count": summary.edit_count, + "last_edited_at": summary.last_edited_at, + "last_edited_by": summary.last_edited_by, + }), + ); + Ok(()) + } + // Permission validation helpers /// Full write-access gate: resolve channel + readability + membership + SEND_MESSAGE. @@ -481,7 +518,7 @@ impl MessageService { } } - fn validate_body_size(&self, body: &str) -> ImksResult<()> { + pub(crate) fn validate_body_size(&self, body: &str) -> ImksResult<()> { if body.len() > self.max_body_size { return Err(ImksError::InvalidInput(format!( "Message body exceeds max size of {} bytes (got {})", diff --git a/svc/scheduled.rs b/svc/scheduled.rs index 4ed07a7..f75266a 100644 --- a/svc/scheduled.rs +++ b/svc/scheduled.rs @@ -1,15 +1,132 @@ -//! Scheduled message dispatcher on `MessageService`. +//! Scheduled message handler on `MessageService`. //! -//! A background task that periodically scans for due scheduled messages -//! and sends them through the normal message path. +//! Provides: +//! - Client-facing CRUD: schedule, cancel, list pending scheduled messages +//! - Background dispatcher: periodically scans for due scheduled messages +//! and sends them through the normal message path. +use std::sync::Arc; use std::time::Duration; +use chrono::{DateTime, Utc}; +use uuid::Uuid; + use crate::repo::CreateMessageInput; +use crate::socket::socket::Socket; +use crate::{ImksError, ImksResult}; use super::message::MessageService; impl MessageService { + // ── Client-facing scheduled message CRUD ── + + /// Handle `message:schedule` — schedule a message to be sent at a future time. + pub async fn schedule_message( + &self, + socket: Arc, + data: &serde_json::Value, + ) -> ImksResult<()> { + let user_id = self.user_id(&socket)?; + let payload = Self::first_payload(data)?; + + let channel_id: Uuid = Self::parse_field(payload, "channel_id")?; + let body: String = Self::parse_field(payload, "body")?; + let thread_id: Option = Self::parse_optional(payload, "thread_id")?; + let reply_to_message_id: Option = + Self::parse_optional(payload, "reply_to_message_id")?; + let metadata: Option = + Self::parse_optional(payload, "metadata")?; + let scheduled_at_str: String = Self::parse_field(payload, "scheduled_at")?; + + let scheduled_at: DateTime = chrono::DateTime::parse_from_rfc3339(&scheduled_at_str) + .map_err(|e| ImksError::InvalidInput(format!("Invalid scheduled_at: {e}")))? + .into(); + + let channel_id_str = channel_id.to_string(); + let user_id_str = user_id.to_string(); + + self.validate_body_size(&body)?; + self.ensure_readable(&channel_id_str, &user_id_str).await?; + self.ensure_member(&channel_id_str, &user_id_str).await?; + + // Validate scheduled_at is in the future + if scheduled_at <= Utc::now() { + return Err(ImksError::InvalidInput( + "scheduled_at must be in the future".into(), + )); + } + + let scheduled = self + .repo + .schedule_message( + channel_id, + user_id, + thread_id, + reply_to_message_id, + &body, + metadata, + scheduled_at, + ) + .await?; + + tracing::info!( + scheduled_id = %scheduled.id, + channel_id = %channel_id, + user_id = %user_id, + scheduled_at = %scheduled_at, + "Message scheduled" + ); + Ok(()) + } + + /// Handle `message:cancel_scheduled` — cancel a pending scheduled message. + pub async fn cancel_scheduled( + &self, + socket: Arc, + data: &serde_json::Value, + ) -> ImksResult<()> { + let user_id = self.user_id(&socket)?; + let payload = Self::first_payload(data)?; + let scheduled_id: Uuid = Self::parse_field(payload, "scheduled_id")?; + + let cancelled = self.repo.cancel_scheduled(scheduled_id).await?; + + if !cancelled { + return Err(ImksError::NotFound(format!( + "scheduled message {scheduled_id} not found or already processed" + ))); + } + + tracing::info!(%scheduled_id, %user_id, "Scheduled message cancelled"); + Ok(()) + } + + /// Handle `message:list_scheduled` — list pending scheduled messages for a channel. + pub async fn list_scheduled( + &self, + socket: Arc, + data: &serde_json::Value, + ) -> ImksResult<()> { + let user_id = self.user_id(&socket)?; + let payload = Self::first_payload(data)?; + let channel_id: Uuid = Self::parse_field(payload, "channel_id")?; + + let channel_id_str = channel_id.to_string(); + let user_id_str = user_id.to_string(); + + self.ensure_readable(&channel_id_str, &user_id_str).await?; + + let scheduled = self.repo.list_scheduled(channel_id, user_id).await?; + + let _ = socket.emit( + "scheduled:loaded", + serde_json::to_value(&scheduled).unwrap_or_default(), + ); + Ok(()) + } + + // ── Background dispatcher ── + /// Start the background scheduled-message dispatcher. /// Scans every 30 seconds for pending messages whose `scheduled_at` has passed. pub fn start_scheduled_dispatcher(self: std::sync::Arc) { @@ -55,7 +172,6 @@ impl MessageService { .mark_scheduled_sent(scheduled.id, message.id) .await?; - // Broadcast to channel if let Some(ns) = self.namespaces.get_namespace("/") { ns.emit_to_room( &scheduled.channel_id.to_string(), diff --git a/telemetry/config.rs b/telemetry/config.rs new file mode 100644 index 0000000..a3c6589 --- /dev/null +++ b/telemetry/config.rs @@ -0,0 +1,85 @@ +/// Telemetry configuration, populated from environment variables. +/// +/// Follows the OpenTelemetry environment variable specification: +/// +#[derive(Debug, Clone)] +pub struct TelemetryConfig { + pub service_name: String, + pub service_version: String, + pub otlp_endpoint: String, + pub otlp_protocol: OtlpProtocol, + pub traces_enabled: bool, + pub metrics_enabled: bool, + pub logs_enabled: bool, + pub log_format: LogFormat, + pub log_level: String, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum OtlpProtocol { + Grpc, + HttpProtobuf, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum LogFormat { + Json, + Pretty, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + service_name: env_or("OTEL_SERVICE_NAME", "imks"), + service_version: env_or("OTEL_SERVICE_VERSION", env!("CARGO_PKG_VERSION")), + otlp_endpoint: env_or( + "OTEL_EXPORTER_OTLP_ENDPOINT", + "http://localhost:4317", + ), + otlp_protocol: detect_otlp_protocol(), + traces_enabled: env_bool("OTEL_TRACES_ENABLED", true), + metrics_enabled: env_bool("OTEL_METRICS_ENABLED", true), + logs_enabled: env_bool("OTEL_LOGS_ENABLED", true), + log_format: detect_log_format(), + log_level: env_or("RUST_LOG", "info"), + } + } +} + +impl TelemetryConfig { + pub fn from_env() -> Self { + Self::default() + } +} + +fn env_or(key: &str, default: &str) -> String { + std::env::var(key).unwrap_or_else(|_| default.to_string()) +} + +fn env_bool(key: &str, default: bool) -> bool { + std::env::var(key) + .map(|v| matches!(v.to_lowercase().as_str(), "true" | "1" | "yes" | "on")) + .unwrap_or(default) +} + +fn detect_otlp_protocol() -> OtlpProtocol { + match std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL") + .unwrap_or_default() + .to_lowercase() + .as_str() + { + "http/protobuf" | "http/binary" => OtlpProtocol::HttpProtobuf, + _ => OtlpProtocol::Grpc, // default to gRPC as project already depends on tonic + } +} + +fn detect_log_format() -> LogFormat { + match std::env::var("LOG_FORMAT") + .unwrap_or_else(|_| "json".to_string()) + .to_lowercase() + .as_str() + { + "pretty" | "text" | "console" => LogFormat::Pretty, + _ => LogFormat::Json, // default to JSON for structured logging + } +} diff --git a/telemetry/health.rs b/telemetry/health.rs new file mode 100644 index 0000000..af236cb --- /dev/null +++ b/telemetry/health.rs @@ -0,0 +1,170 @@ +//! Enhanced health check endpoint with upstream dependency checks. +//! +//! Returns JSON with server status, version, uptime, connection counts, +//! and optional health checks for PostgreSQL, Redis, NATS, and gRPC. + +use std::sync::Arc; +use std::sync::OnceLock; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; + +use actix_web::HttpResponse; +use serde::Serialize; + +/// Server start time captured at init. +static START_TIME: std::sync::OnceLock = std::sync::OnceLock::new(); + +/// Live connection counter shared across the process. +/// Updated by the socket layer on connect / disconnect. +static CONNECTIONS_ACTIVE: OnceLock = OnceLock::new(); + +/// Initializes the start time (call once during startup). +pub fn record_start_time() { + START_TIME.set(Instant::now()).ok(); +} + +/// Initialize shared health counters (call once during startup). +pub fn init_counters() { + CONNECTIONS_ACTIVE.set(AtomicU64::new(0)).ok(); +} + +/// Signal that a new socket connection was established. +pub fn connection_connected() { + if let Some(c) = CONNECTIONS_ACTIVE.get() { + c.fetch_add(1, Ordering::Relaxed); + } +} + +/// Signal that a socket connection was closed. +pub fn connection_disconnected() { + if let Some(c) = CONNECTIONS_ACTIVE.get() { + c.fetch_sub(1, Ordering::Relaxed); + } +} + +/// Return the current number of active socket connections. +pub fn connections_active_count() -> u64 { + CONNECTIONS_ACTIVE + .get() + .map(|c| c.load(Ordering::Relaxed)) + .unwrap_or(0) +} + +/// Returns the server uptime in seconds. +pub fn uptime_secs() -> u64 { + START_TIME + .get() + .map(|t| t.elapsed().as_secs()) + .unwrap_or(0) +} + +#[derive(Debug, Clone, Serialize)] +pub struct HealthResponse { + pub status: String, + pub version: String, + pub timestamp: String, + pub uptime_secs: u64, + pub connections_active: u64, + pub sessions_count: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub checks: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct HealthChecks { + #[serde(skip_serializing_if = "Option::is_none")] + pub postgres: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub redis: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub nats: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub grpc: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct CheckResult { + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub latency_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Optional external check functions. +/// Each returns `Some(CheckResult)` if the service is configured, `None` otherwise. +#[derive(Default)] +pub struct HealthCheckFns { + pub check_postgres: Option CheckResult + Send + Sync>>, + pub check_redis: Option CheckResult + Send + Sync>>, + pub check_nats: Option CheckResult + Send + Sync>>, + pub check_grpc: Option CheckResult + Send + Sync>>, +} + +impl HealthCheckFns { + pub fn with_postgres(mut self, f: impl Fn() -> CheckResult + Send + Sync + 'static) -> Self { + self.check_postgres = Some(Arc::new(f)); + self + } + + pub fn with_redis(mut self, f: impl Fn() -> CheckResult + Send + Sync + 'static) -> Self { + self.check_redis = Some(Arc::new(f)); + self + } + + pub fn with_nats(mut self, f: impl Fn() -> CheckResult + Send + Sync + 'static) -> Self { + self.check_nats = Some(Arc::new(f)); + self + } + + pub fn with_grpc(mut self, f: impl Fn() -> CheckResult + Send + Sync + 'static) -> Self { + self.check_grpc = Some(Arc::new(f)); + self + } +} + +/// GET /health handler with dependency checks. +pub async fn health_check(checks: actix_web::web::Data>) -> HttpResponse { + let checks = checks.get_ref(); + + let health_checks = if checks.check_postgres.is_some() + || checks.check_redis.is_some() + || checks.check_nats.is_some() + || checks.check_grpc.is_some() + { + Some(HealthChecks { + postgres: checks.check_postgres.as_ref().map(|f| f()), + redis: checks.check_redis.as_ref().map(|f| f()), + nats: checks.check_nats.as_ref().map(|f| f()), + grpc: checks.check_grpc.as_ref().map(|f| f()), + }) + } else { + None + }; + + let overall_status = if let Some(ref hc) = health_checks { + let all_up = [&hc.postgres, &hc.redis, &hc.nats, &hc.grpc] + .iter() + .filter_map(|c| c.as_ref()) + .all(|c| c.status == "up"); + if all_up { + "healthy" + } else { + "degraded" + } + } else { + "healthy" + }; + + let response = HealthResponse { + status: overall_status.to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + timestamp: chrono::Utc::now().to_rfc3339(), + uptime_secs: uptime_secs(), + connections_active: 0, + sessions_count: 0, + checks: health_checks, + }; + + HttpResponse::Ok().json(response) +} diff --git a/telemetry/logs.rs b/telemetry/logs.rs new file mode 100644 index 0000000..931e7bd --- /dev/null +++ b/telemetry/logs.rs @@ -0,0 +1,129 @@ +//! Log export: JSON console output + OpenTelemetry log bridge (OTLP). + +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{LogExporter, Protocol, WithExportConfig}; +use opentelemetry_sdk::logs::SdkLoggerProvider; +use opentelemetry_sdk::Resource; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::Registry; + +use super::config::{OtlpProtocol, TelemetryConfig}; +use crate::ImksResult; + +/// Initialize the tracing subscriber. +/// +/// Layer order (critical for OpenTelemetry compatibility): +/// 1. Registry +/// 2. OpenTelemetry trace layer (must be first — needs LookupSpan) +/// 3. EnvFilter +/// 4. Console formatting layer (JSON) +/// 5. OpenTelemetry log bridge +/// +/// Returns the SdkLoggerProvider for graceful shutdown. +pub fn init_subscriber( + config: &TelemetryConfig, + resource: Option<&Resource>, + otel_trace_layer: Option< + tracing_opentelemetry::OpenTelemetryLayer, + >, +) -> ImksResult { + let env_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&config.log_level)); + + let (logger_provider, log_bridge_layer) = if config.logs_enabled { + let exporter = build_log_exporter(config)?; + + let resource = resource.cloned().unwrap_or_else(|| Resource::builder().build()); + + let provider = SdkLoggerProvider::builder() + .with_resource(resource) + .with_batch_exporter(exporter) + .build(); + + let bridge = OpenTelemetryTracingBridge::new(&provider); + (Some(provider), Some(bridge)) + } else { + (None, None) + }; + + match (otel_trace_layer, log_bridge_layer) { + (Some(trace_layer), Some(log_layer)) => { + let subscriber = Registry::default() + .with(trace_layer) + .with(env_filter) + .with(make_json_fmt()) + .with(log_layer); + set_subscriber(subscriber); + } + (Some(trace_layer), None) => { + let subscriber = Registry::default() + .with(trace_layer) + .with(env_filter) + .with(make_json_fmt()); + set_subscriber(subscriber); + } + (None, Some(log_layer)) => { + let subscriber = Registry::default() + .with(env_filter) + .with(make_json_fmt()) + .with(log_layer); + set_subscriber(subscriber); + } + (None, None) => { + let subscriber = Registry::default() + .with(env_filter) + .with(make_json_fmt()); + set_subscriber(subscriber); + } + } + + let logger_provider = logger_provider.unwrap_or_else(|| SdkLoggerProvider::builder().build()); + + Ok(logger_provider) +} + +/// Create the JSON fmt layer with span context. +fn make_json_fmt() -> tracing_subscriber::fmt::Layer< + S, + tracing_subscriber::fmt::format::JsonFields, + tracing_subscriber::fmt::format::Format, +> +where + S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, +{ + tracing_subscriber::fmt::layer() + .json() + .with_span_events(FmtSpan::CLOSE) + .with_current_span(true) + .with_span_list(true) +} + +fn set_subscriber(subscriber: S) +where + S: tracing::Subscriber + Send + Sync + 'static, +{ + match tracing::subscriber::set_global_default(subscriber) { + Ok(()) => {} + Err(e) => { + tracing::warn!("Could not set global tracing subscriber: {e}"); + } + } +} + +fn build_log_exporter(config: &TelemetryConfig) -> ImksResult { + match config.otlp_protocol { + OtlpProtocol::Grpc => LogExporter::builder() + .with_tonic() + .with_endpoint(&config.otlp_endpoint) + .build() + .map_err(|e| crate::ImksError::Internal(format!("OTLP gRPC log exporter: {e}"))), + OtlpProtocol::HttpProtobuf => LogExporter::builder() + .with_http() + .with_protocol(Protocol::HttpBinary) + .with_endpoint(&config.otlp_endpoint) + .build() + .map_err(|e| crate::ImksError::Internal(format!("OTLP HTTP log exporter: {e}"))), + } +} diff --git a/telemetry/metrics.rs b/telemetry/metrics.rs new file mode 100644 index 0000000..4be5147 --- /dev/null +++ b/telemetry/metrics.rs @@ -0,0 +1,168 @@ +//! Prometheus metrics: global meter provider, registry, and the /metrics actix-web handler. + +use std::sync::OnceLock; + +use opentelemetry::global; +use opentelemetry::metrics::{Counter, Histogram, Meter, UpDownCounter}; +use opentelemetry::KeyValue; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::Resource; +use prometheus::{Encoder, Registry, TextEncoder}; + +use crate::ImksResult; + +/// Shared Prometheus registry, lazily initialized. +static PROMETHEUS_REGISTRY: OnceLock = OnceLock::new(); + +/// Global metrics instruments, initialized once at startup. +static METRICS: OnceLock = OnceLock::new(); + +/// All application metrics instruments. +#[derive(Debug, Clone)] +pub struct MetricsInstruments { + pub connections_active: UpDownCounter, + pub connections_total: Counter, + pub disconnections_total: Counter, + pub messages_received_total: Counter, + pub messages_sent_total: Counter, + pub event_handling_duration: Histogram, + pub db_query_duration: Histogram, + pub engine_sessions_active: UpDownCounter, + pub namespaces_active: UpDownCounter, + pub gprc_calls_total: Counter, + pub gprc_call_errors_total: Counter, + pub adapter_broadcasts_total: Counter, +} + +/// Initialize the Prometheus meter provider and create all metric instruments. +pub fn init_metrics( + _config: &super::config::TelemetryConfig, + resource: &Resource, +) -> ImksResult<(SdkMeterProvider, MetricsInstruments)> { + let registry = Registry::new(); + PROMETHEUS_REGISTRY + .set(registry.clone()) + .expect("Prometheus registry already initialized"); + + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry) + .build() + .map_err(|e| crate::ImksError::Internal(format!("failed to build Prometheus exporter: {e}")))?; + + let provider = SdkMeterProvider::builder() + .with_resource(resource.clone()) + .with_reader(exporter) + .build(); + + global::set_meter_provider(provider.clone()); + + let meter = global::meter_with_scope( + opentelemetry::InstrumentationScope::builder("imks") + .with_version(env!("CARGO_PKG_VERSION")) + .build(), + ); + + let instruments = MetricsInstruments::new(&meter); + METRICS + .set(instruments.clone()) + .expect("Metrics instruments already initialized"); + + Ok((provider, instruments)) +} + +/// Obtain the globally initialized metrics. Panics if not initialized. +pub fn get() -> MetricsInstruments { + METRICS + .get() + .expect("Metrics not initialized — call init_metrics first") + .clone() +} + +/// Obtain the globally initialized metrics, returning `None` if not initialized. +/// Prefer this in library code that may run before metrics are set up (e.g., tests). +pub fn try_get() -> Option { + METRICS.get().cloned() +} + +impl MetricsInstruments { + fn new(meter: &Meter) -> Self { + Self { + connections_active: meter + .i64_up_down_counter("imks_connections_active") + .with_description("Number of active Socket.IO connections") + .build(), + connections_total: meter + .u64_counter("imks_connections_total") + .with_description("Total number of socket connections since start") + .build(), + disconnections_total: meter + .u64_counter("imks_disconnections_total") + .with_description("Total number of socket disconnections since start") + .build(), + messages_received_total: meter + .u64_counter("imks_messages_received_total") + .with_description("Total number of messages received from clients") + .build(), + messages_sent_total: meter + .u64_counter("imks_messages_sent_total") + .with_description("Total number of messages sent to clients") + .build(), + event_handling_duration: meter + .f64_histogram("imks_event_handling_duration_seconds") + .with_description("Socket.IO event handling latency in seconds") + .build(), + db_query_duration: meter + .f64_histogram("imks_db_query_duration_seconds") + .with_description("Database query duration in seconds") + .build(), + engine_sessions_active: meter + .i64_up_down_counter("imks_engine_sessions_active") + .with_description("Number of active Engine.IO sessions") + .build(), + namespaces_active: meter + .i64_up_down_counter("imks_namespaces_active") + .with_description("Number of active Socket.IO namespaces") + .build(), + gprc_calls_total: meter + .u64_counter("imks_gprc_calls_total") + .with_description("Total number of gRPC calls to appks") + .build(), + gprc_call_errors_total: meter + .u64_counter("imks_gprc_call_errors_total") + .with_description("Total number of failed gRPC calls to appks") + .build(), + adapter_broadcasts_total: meter + .u64_counter("imks_adapter_broadcasts_total") + .with_description("Total number of cross-node adapter broadcasts") + .build(), + } + } + + /// Helper: create KV attributes for an event. + pub fn event_attrs(event: &str) -> [KeyValue; 1] { + [KeyValue::new("event", event.to_string())] + } + + /// Helper: create KV attributes for a namespace. + pub fn namespace_attrs(ns: &str) -> [KeyValue; 1] { + [KeyValue::new("namespace", ns.to_string())] + } +} + +/// Actix-web handler for `GET /metrics`. +/// +/// Encodes the Prometheus text format from the shared registry. +pub async fn metrics_handler() -> actix_web::HttpResponse { + let registry = PROMETHEUS_REGISTRY.get().expect("Prometheus registry not initialized"); + + let metric_families = registry.gather(); + let encoder = TextEncoder::new(); + let mut buffer = Vec::new(); + if encoder.encode(&metric_families, &mut buffer).is_err() { + return actix_web::HttpResponse::InternalServerError().body("failed to encode metrics"); + } + + actix_web::HttpResponse::Ok() + .content_type("text/plain; version=0.0.4") + .body(buffer) +} diff --git a/telemetry/mod.rs b/telemetry/mod.rs new file mode 100644 index 0000000..7c3597b --- /dev/null +++ b/telemetry/mod.rs @@ -0,0 +1,203 @@ +//! Telemetry module — OpenTelemetry-compatible observability stack. +//! +//! Provides: +//! - **Traces**: distributed tracing via OTLP (gRPC or HTTP) with W3C TraceContext propagation +//! - **Metrics**: Prometheus-compatible metrics exposed at `/metrics` +//! - **Logs**: JSON + console dual output, plus OTLP log export bridge +//! - **Health**: enhanced `/health` endpoint with upstream dependency checks +//! +//! # Quick start +//! +//! ```ignore +//! let guard = telemetry::init(); +//! // ... application runs ... +//! drop(guard); // graceful shutdown, flushes all pending telemetry +//! ``` +//! +//! # Environment variables +//! +//! | Variable | Default | Description | +//! |---|---|---| +//! | `OTEL_SERVICE_NAME` | `imks` | Service name in traces/metrics/logs | +//! | `OTEL_SERVICE_VERSION` | Cargo version | Service version | +//! | `OTEL_EXPORTER_OTLP_ENDPOINT` | `http://localhost:4317` | OTLP collector endpoint | +//! | `OTEL_EXPORTER_OTLP_PROTOCOL` | `grpc` | `grpc` or `http/protobuf` | +//! | `OTEL_TRACES_ENABLED` | `true` | Enable distributed tracing | +//! | `OTEL_METRICS_ENABLED` | `true` | Enable Prometheus metrics | +//! | `OTEL_LOGS_ENABLED` | `true` | Enable OTLP log export | +//! | `LOG_FORMAT` | `both` | `json`, `pretty`, or `both` | +//! | `RUST_LOG` | `info` | Log level filter | + +pub mod config; +pub mod health; +pub mod logs; +pub mod metrics; +pub mod traces; + +use opentelemetry_sdk::Resource; + +pub use config::TelemetryConfig; +pub use health::{HealthCheckFns, health_check}; +pub use metrics::{MetricsInstruments, get as metrics, try_get as try_metrics}; + +/// Holds all telemetry providers for graceful shutdown. +/// +/// When `shutdown()` is called, flushes and shuts down all providers in order: +/// tracer → meter → logger. +pub struct TelemetryGuard { + tracer_provider: Option, + meter_provider: Option, + logger_provider: Option, +} + +impl TelemetryGuard { + /// Flush all pending telemetry and shut down providers. + /// + /// Call this before process exit to avoid data loss. + pub fn shutdown(mut self) { + if let Some(tp) = self.tracer_provider.take() + && let Ok(rt) = tokio::runtime::Runtime::new() + { + rt.block_on(async { + tp.shutdown().unwrap_or_default(); + }); + } + if let Some(mp) = self.meter_provider.take() + && let Ok(rt) = tokio::runtime::Runtime::new() + { + rt.block_on(async { + mp.shutdown().unwrap_or_default(); + }); + } + if let Some(lp) = self.logger_provider.take() + && let Ok(rt) = tokio::runtime::Runtime::new() + { + rt.block_on(async { + lp.shutdown().unwrap_or_default(); + }); + } + } + + /// Force-flush all pending trace spans (non-blocking best-effort). + pub fn flush_traces(&self) { + if let Some(ref tp) = self.tracer_provider + && let Ok(rt) = tokio::runtime::Runtime::new() + { + rt.block_on(async { + tp.force_flush().unwrap_or_default(); + }); + } + } + + /// Force-flush all pending metrics. + pub fn flush_metrics(&self) { + if let Some(ref mp) = self.meter_provider + && let Ok(rt) = tokio::runtime::Runtime::new() + { + rt.block_on(async { + mp.force_flush().unwrap_or_default(); + }); + } + } +} + +impl Drop for TelemetryGuard { + fn drop(&mut self) { + // Best-effort: the caller should call shutdown() explicitly before process exit + } +} + +/// Initialize the full telemetry stack. +/// +/// 1. Creates the OTel Resource (service name, version, host) +/// 2. Sets up tracing subscriber with console + JSON + OTel layers +/// 3. Initializes Prometheus metrics +/// 4. Records server start time for uptime tracking +/// +/// Returns a `TelemetryGuard` that should be held until process exit. +pub fn init() -> TelemetryGuard { + let config = TelemetryConfig::from_env(); + + let resource = Resource::builder() + .with_service_name(config.service_name.clone()) + .with_attribute(opentelemetry::KeyValue::new( + "service.version", + config.service_version.clone(), + )) + .with_attribute(opentelemetry::KeyValue::new( + "deployment.environment", + std::env::var("OTEL_RESOURCE_ATTRIBUTES_DEPLOYMENT") + .unwrap_or_else(|_| "development".to_string()), + )) + .build(); + + // 1. Set up tracing (traces + subscriber) + let (tracer_provider, logger_provider) = if config.traces_enabled { + match traces::init_tracing(&config, &resource) { + Ok((provider, otel_layer)) => { + match logs::init_subscriber(&config, Some(&resource), Some(otel_layer)) { + Ok(logger_provider) => { + tracing::info!( + service = %config.service_name, + endpoint = %config.otlp_endpoint, + protocol = ?config.otlp_protocol, + "OpenTelemetry tracing initialized" + ); + (Some(provider), Some(logger_provider)) + } + Err(e) => { + tracing::warn!( + "Failed to initialize log bridge: {e}. Tracing still active." + ); + (Some(provider), None) + } + } + } + Err(e) => { + tracing::warn!( + "Failed to initialize OTLP tracing: {e}. Using console-only logging." + ); + match logs::init_subscriber(&config, Some(&resource), None) { + Ok(lp) => (None, Some(lp)), + Err(_) => { + tracing_subscriber::fmt().init(); + (None, None) + } + } + } + } + } else { + match logs::init_subscriber(&config, Some(&resource), None) { + Ok(lp) => (None, Some(lp)), + Err(_) => { + tracing_subscriber::fmt().init(); + (None, None) + } + } + }; + + // 2. Metrics + let meter_provider = if config.metrics_enabled { + match metrics::init_metrics(&config, &resource) { + Ok((provider, _instruments)) => { + tracing::info!("Prometheus metrics initialized (available at /metrics)"); + Some(provider) + } + Err(e) => { + tracing::warn!("Failed to initialize Prometheus metrics: {e}"); + None + } + } + } else { + None + }; + + // 3. Record start time for uptime + health::record_start_time(); + + TelemetryGuard { + tracer_provider, + meter_provider, + logger_provider, + } +} diff --git a/telemetry/traces.rs b/telemetry/traces.rs new file mode 100644 index 0000000..78ea0e6 --- /dev/null +++ b/telemetry/traces.rs @@ -0,0 +1,55 @@ +//! OpenTelemetry distributed tracing — OTLP exporter + tracing-opentelemetry bridge. + +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::{SdkTracerProvider, Tracer}; +use opentelemetry_sdk::Resource; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::Registry; + +use super::config::{OtlpProtocol, TelemetryConfig}; +use crate::ImksResult; + +/// Build an OTLP SpanExporter based on the configured protocol. +fn build_span_exporter(config: &TelemetryConfig) -> ImksResult { + match config.otlp_protocol { + OtlpProtocol::Grpc => SpanExporter::builder() + .with_tonic() + .with_endpoint(&config.otlp_endpoint) + .build() + .map_err(|e| crate::ImksError::Internal(format!("OTLP gRPC span exporter: {e}"))), + OtlpProtocol::HttpProtobuf => SpanExporter::builder() + .with_http() + .with_protocol(Protocol::HttpBinary) + .with_endpoint(&config.otlp_endpoint) + .build() + .map_err(|e| { + crate::ImksError::Internal(format!("OTLP HTTP span exporter: {e}")) + }), + } +} + +/// Initialize the tracing pipeline: OTel tracer provider + tracing-opentelemetry layer. +/// +/// Returns (SdkTracerProvider, OpenTelemetryLayer). +pub fn init_tracing( + config: &TelemetryConfig, + resource: &Resource, +) -> ImksResult<(SdkTracerProvider, OpenTelemetryLayer)> { + // Set global propagator for W3C TraceContext extraction/injection + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + + let exporter = build_span_exporter(config)?; + + let provider = SdkTracerProvider::builder() + .with_resource(resource.clone()) + .with_batch_exporter(exporter) + .build(); + + let tracer = provider.tracer("imks"); + + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + Ok((provider, otel_layer)) +}