diff --git a/main.rs b/main.rs index abb3ec4..22e2871 100644 --- a/main.rs +++ b/main.rs @@ -58,12 +58,16 @@ async fn main() -> Result<(), Box> { // Discover appks from etcd (priority > env). // etcd-registered addresses are bare "host:port" — prepend http:// for gRPC. + // Bind address 0.0.0.0 is not connectable; replace with localhost. let appks_addr = etcd .discover_service("appks") .await .ok() .and_then(|addrs| addrs.into_iter().next()) - .map(|addr| format!("http://{}", addr)) + .map(|addr| { + let normalized = addr.replace("0.0.0.0", "127.0.0.1"); + format!("http://{}", normalized) + }) .unwrap_or_else(|| { std::env::var("APPKS_GRPC_ADDR") .unwrap_or_else(|_| "http://localhost:50051".to_string()) @@ -183,52 +187,77 @@ async fn main() -> Result<(), Box> { let namespaces = namespaces.clone(); let current_addr = current_addr.clone(); let mut rpc = rpc_config.clone(); - // etcd-registered address is bare "host:port" — prepend scheme for gRPC - rpc.appks_addr = if addr.starts_with("http") { - addr + // etcd-registered address is bare "host:port" — prepend scheme for gRPC. + // Bind address 0.0.0.0 is not connectable; replace with localhost. + let normalized = addr.replace("0.0.0.0", "127.0.0.1"); + rpc.appks_addr = if normalized.starts_with("http") { + normalized } else { - format!("http://{}", addr) + format!("http://{}", normalized) }; async move { - match AppksClients::connect(&rpc).await { - Ok(clients) => { - match MessageService::new((*repo).clone(), clients, namespaces.clone()) - .await - { - Ok(svc) => { - let svc = Arc::new(svc); - let mut guard = service.write().await; - *guard = Some(svc); + // Retry with backoff — appks may have registered in etcd + // before its gRPC server finished binding. + let max_retries = 5; + let mut delay = std::time::Duration::from_millis(500); - // Update the active appks address for health checker - let mut addr_guard = current_addr.write().await; - *addr_guard = Some(rpc.appks_addr.clone()); + for attempt in 1..=max_retries { + match AppksClients::connect(&rpc).await { + Ok(clients) => { + match MessageService::new((*repo).clone(), clients, namespaces.clone()) + .await + { + Ok(svc) => { + let svc = Arc::new(svc); + let mut guard = service.write().await; + *guard = Some(svc); - tracing::info!( - addr = %rpc.appks_addr, - "Message service initialized" - ); - true + // Update the active appks address for health checker + let mut addr_guard = current_addr.write().await; + *addr_guard = Some(rpc.appks_addr.clone()); + + tracing::info!( + addr = %rpc.appks_addr, + attempt, + "Message service initialized" + ); + return true; + } + Err(e) => { + tracing::warn!( + addr = %rpc.appks_addr, + attempt, + error = %e, + "Failed to init message service" + ); + if attempt < max_retries { + tokio::time::sleep(delay).await; + delay *= 2; + } + } } - Err(e) => { - tracing::warn!( - addr = %rpc.appks_addr, - error = %e, - "Failed to init message service" - ); - false + } + Err(e) => { + tracing::warn!( + addr = %rpc.appks_addr, + attempt, + error = %e, + "gRPC connect failed" + ); + if attempt < max_retries { + tokio::time::sleep(delay).await; + delay *= 2; } } } - Err(e) => { - tracing::warn!( - addr = %rpc.appks_addr, - error = %e, - "gRPC connect failed" - ); - false - } } + + tracing::error!( + addr = %rpc.appks_addr, + max_retries, + "All connection attempts to appks exhausted" + ); + false } } }; diff --git a/migrate/000_message_base.sql b/migrate/000_message_base.sql new file mode 100644 index 0000000..e34493d --- /dev/null +++ b/migrate/000_message_base.sql @@ -0,0 +1,46 @@ +-- ============================================================ +-- Migration: 000_message_base.sql +-- Tables: message +-- ============================================================ +-- Core message table managed by imks. All companion tables +-- (thread, attachment, embed, reaction, ...) reference message(id). + +BEGIN; + +CREATE TABLE IF NOT EXISTS message ( + id UUID PRIMARY KEY, + channel_id UUID NOT NULL, + author_id UUID NOT NULL, + thread_id UUID NULL, + reply_to_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + message_type TEXT NOT NULL DEFAULT 'text', + body TEXT NOT NULL, + metadata JSONB NULL, + pinned BOOLEAN NOT NULL DEFAULT FALSE, + system BOOLEAN NOT NULL DEFAULT FALSE, + edited_at TIMESTAMPTZ NULL, + deleted_at TIMESTAMPTZ NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Cursor pagination: WHERE id < $cursor ORDER BY id DESC +CREATE INDEX IF NOT EXISTS idx_message_channel_created + ON message (channel_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_message_author + ON message (author_id); + +CREATE INDEX IF NOT EXISTS idx_message_thread + ON message (thread_id) + WHERE thread_id IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_message_reply_to + ON message (reply_to_message_id) + WHERE reply_to_message_id IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_message_deleted + ON message (deleted_at) + WHERE deleted_at IS NOT NULL; + +COMMIT; diff --git a/migrate/001_message_thread_base.sql b/migrate/001_message_thread_base.sql new file mode 100644 index 0000000..4a482ed --- /dev/null +++ b/migrate/001_message_thread_base.sql @@ -0,0 +1,23 @@ +-- Create message_thread before migrations that reference it. +-- Safe for existing databases because the table may already exist from 004. + +BEGIN; + +CREATE TABLE IF NOT EXISTS message_thread ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + channel_id UUID NOT NULL, + root_message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + created_by UUID NOT NULL, + replies_count BIGINT NOT NULL DEFAULT 0, + participants_count BIGINT NOT NULL DEFAULT 0, + last_reply_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + last_reply_at TIMESTAMPTZ NULL, + resolved BOOLEAN NOT NULL DEFAULT FALSE, + resolved_by UUID NULL, + resolved_at TIMESTAMPTZ NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_thread_root UNIQUE (root_message_id) +); + +COMMIT; diff --git a/migrate/002_message_rich_content.sql b/migrate/002_message_rich_content.sql new file mode 100644 index 0000000..1060efc --- /dev/null +++ b/migrate/002_message_rich_content.sql @@ -0,0 +1,115 @@ +-- ============================================================ +-- Migration: 001_message_rich_content.sql +-- Tables: message_attachment, message_embed, message_embed_field, +-- message_poll, message_poll_option, message_poll_vote +-- ============================================================ +-- These tables extend the existing `message` table (from appks 001_init.sql) +-- with Discord-style rich content: file attachments, link preview embeds, +-- and interactive polls. + +BEGIN; + +-- models/message_attachment.rs → message_attachment +CREATE TABLE IF NOT EXISTS message_attachment ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + filename TEXT NOT NULL, + content_type TEXT NULL, + size BIGINT NOT NULL, + url TEXT NOT NULL, + storage_key TEXT NULL, + width INTEGER NULL, + height INTEGER NULL, + duration_secs DOUBLE PRECISION NULL, + blurhash TEXT NULL, + spoiler BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS idx_message_attachment_message_id + ON message_attachment (message_id); + +-- models/message_embed.rs → message_embed +CREATE TABLE IF NOT EXISTS message_embed ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + embed_type TEXT NOT NULL, + title TEXT NULL, + description TEXT NULL, + url TEXT NULL, + color INTEGER NULL, + image_url TEXT NULL, + image_width INTEGER NULL, + image_height INTEGER NULL, + thumbnail_url TEXT NULL, + thumbnail_width INTEGER NULL, + thumbnail_height INTEGER NULL, + video_url TEXT NULL, + video_width INTEGER NULL, + video_height INTEGER NULL, + author_name TEXT NULL, + author_url TEXT NULL, + author_icon_url TEXT NULL, + footer_text TEXT NULL, + footer_icon_url TEXT NULL, + provider_name TEXT NULL, + provider_url TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS idx_message_embed_message_id + ON message_embed (message_id); + +-- models/message_embed.rs → message_embed_field +CREATE TABLE IF NOT EXISTS message_embed_field ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + embed_id UUID NOT NULL REFERENCES message_embed(id) ON DELETE CASCADE, + name TEXT NOT NULL, + value TEXT NOT NULL, + inline BOOLEAN NOT NULL DEFAULT FALSE, + position INTEGER NOT NULL DEFAULT 0 +); +CREATE INDEX IF NOT EXISTS idx_message_embed_field_embed_id + ON message_embed_field (embed_id); + +-- models/message_poll.rs → message_poll +CREATE TABLE IF NOT EXISTS message_poll ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + question TEXT NOT NULL, + allow_multiselect BOOLEAN NOT NULL DEFAULT FALSE, + max_selections INTEGER NULL, + expires_at TIMESTAMPTZ NULL, + total_votes BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_poll_message UNIQUE (message_id) +); +CREATE INDEX IF NOT EXISTS idx_message_poll_message_id + ON message_poll (message_id); + +-- models/message_poll.rs → message_poll_option +CREATE TABLE IF NOT EXISTS message_poll_option ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + poll_id UUID NOT NULL REFERENCES message_poll(id) ON DELETE CASCADE, + text TEXT NOT NULL, + emoji TEXT NULL, + vote_count BIGINT NOT NULL DEFAULT 0, + position INTEGER NOT NULL DEFAULT 0 +); +CREATE INDEX IF NOT EXISTS idx_message_poll_option_poll_id + ON message_poll_option (poll_id); + +-- models/message_poll.rs → message_poll_vote +CREATE TABLE IF NOT EXISTS message_poll_vote ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + poll_id UUID NOT NULL REFERENCES message_poll(id) ON DELETE CASCADE, + option_id UUID NOT NULL REFERENCES message_poll_option(id) ON DELETE CASCADE, + user_id UUID NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_poll_vote UNIQUE (poll_id, user_id, option_id) +); +CREATE INDEX IF NOT EXISTS idx_message_poll_vote_poll_id + ON message_poll_vote (poll_id); +CREATE INDEX IF NOT EXISTS idx_message_poll_vote_user_id + ON message_poll_vote (user_id); + +COMMIT; diff --git a/migrate/003_message_social.sql b/migrate/003_message_social.sql new file mode 100644 index 0000000..edf209f --- /dev/null +++ b/migrate/003_message_social.sql @@ -0,0 +1,76 @@ +-- ============================================================ +-- Migration: 002_message_social.sql +-- Tables: message_pin, message_read_state, message_draft, message_edit +-- ============================================================ +-- Extends the message subsystem with pinned messages, read receipts, +-- drafts, and edit history. + +BEGIN; + +-- models/message_pin.rs → message_pin +CREATE TABLE IF NOT EXISTS message_pin ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + channel_id UUID NOT NULL, + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + pinned_by UUID NOT NULL, + position INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_pin_channel_message UNIQUE (channel_id, message_id) +); + +CREATE INDEX IF NOT EXISTS idx_message_pin_channel_id + ON message_pin (channel_id); + +-- models/message_read_state.rs → message_read_state +CREATE TABLE IF NOT EXISTS message_read_state ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + channel_id UUID NOT NULL, + user_id UUID NOT NULL, + last_read_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + last_read_at TIMESTAMPTZ NULL, + unread_count BIGINT NOT NULL DEFAULT 0, + unread_mentions BIGINT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_read_state_channel_user UNIQUE (channel_id, user_id) +); + +CREATE INDEX IF NOT EXISTS idx_message_read_state_user_id + ON message_read_state (user_id); +CREATE INDEX IF NOT EXISTS idx_message_read_state_channel_id + ON message_read_state (channel_id); + +-- models/message_draft.rs → message_draft +CREATE TABLE IF NOT EXISTS message_draft ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + channel_id UUID NOT NULL, + user_id UUID NOT NULL, + thread_id UUID NULL REFERENCES message_thread(id) ON DELETE CASCADE, + reply_to_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + body TEXT NOT NULL DEFAULT '', + metadata JSONB NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_draft_channel_user_thread + UNIQUE (channel_id, user_id, thread_id) +); + +CREATE INDEX IF NOT EXISTS idx_message_draft_user_id + ON message_draft (user_id); + +-- models/message_edit.rs → message_edit +CREATE TABLE IF NOT EXISTS message_edit ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + edited_by UUID NOT NULL, + old_body TEXT NOT NULL, + new_body TEXT NOT NULL, + edited_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_message_edit_message_id + ON message_edit (message_id); +CREATE INDEX IF NOT EXISTS idx_message_edit_edited_at + ON message_edit (edited_at); + +COMMIT; diff --git a/migrate/004_message_article.sql b/migrate/004_message_article.sql new file mode 100644 index 0000000..d7452bb --- /dev/null +++ b/migrate/004_message_article.sql @@ -0,0 +1,45 @@ +-- ============================================================ +-- Migration: 003_message_article.sql +-- Tables: message_article +-- ============================================================ +-- Extends the message subsystem with forum-style article posts. +-- Articles extend regular messages with title, cover image, tags, +-- and view/like stats. Rendered as waterfall cards in forum channels. + +BEGIN; + +-- models/message_article.rs → message_article +CREATE TABLE IF NOT EXISTS message_article ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + title TEXT NOT NULL, + summary TEXT NULL, + cover_url TEXT NULL, + cover_width INTEGER NULL, + cover_height INTEGER NULL, + cover_color TEXT NULL, + tags JSONB NULL, + view_count BIGINT NOT NULL DEFAULT 0, + like_count BIGINT NOT NULL DEFAULT 0, + bookmark_count BIGINT NOT NULL DEFAULT 0, + reply_count BIGINT NOT NULL DEFAULT 0, + last_reply_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + last_reply_at TIMESTAMPTZ NULL, + last_reply_user_id UUID NULL, + is_pinned_to_top BOOLEAN NOT NULL DEFAULT FALSE, + is_answered BOOLEAN NOT NULL DEFAULT FALSE, + answered_by UUID NULL, + answered_at TIMESTAMPTZ NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_article_message UNIQUE (message_id) +); + +CREATE INDEX IF NOT EXISTS idx_message_article_last_reply_at + ON message_article (last_reply_at DESC NULLS LAST); +CREATE INDEX IF NOT EXISTS idx_message_article_is_pinned_to_top + ON message_article (is_pinned_to_top DESC, last_reply_at DESC NULLS LAST); +CREATE INDEX IF NOT EXISTS idx_message_article_view_count + ON message_article (view_count DESC); + +COMMIT; diff --git a/migrate/005_message_social_part2.sql b/migrate/005_message_social_part2.sql new file mode 100644 index 0000000..46d078b --- /dev/null +++ b/migrate/005_message_social_part2.sql @@ -0,0 +1,98 @@ +-- ============================================================ +-- Migration: 004_message_social_part2.sql +-- Tables: message_reaction, message_bookmark, message_mention, +-- message_thread, message_thread_participant +-- ============================================================ + +BEGIN; + +-- models/message_reaction.rs → message_reaction +CREATE TABLE IF NOT EXISTS message_reaction ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + channel_id UUID NOT NULL, + user_id UUID NOT NULL, + content TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_reaction_user_content UNIQUE (message_id, user_id, content) +); + +CREATE INDEX IF NOT EXISTS idx_message_reaction_message_id + ON message_reaction (message_id); +CREATE INDEX IF NOT EXISTS idx_message_reaction_user_id + ON message_reaction (user_id); + +-- models/message_bookmark.rs → message_bookmark +CREATE TABLE IF NOT EXISTS message_bookmark ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + channel_id UUID NOT NULL, + user_id UUID NOT NULL, + note TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_bookmark_user_message UNIQUE (user_id, message_id) +); + +CREATE INDEX IF NOT EXISTS idx_message_bookmark_user_id + ON message_bookmark (user_id); +CREATE INDEX IF NOT EXISTS idx_message_bookmark_message_id + ON message_bookmark (message_id); + +-- models/message_mention.rs → message_mention +CREATE TABLE IF NOT EXISTS message_mention ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + channel_id UUID NOT NULL, + mentioned_user_id UUID NOT NULL, + mentioned_by UUID NOT NULL, + read_at TIMESTAMPTZ NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_message_mention_message_id + ON message_mention (message_id); +CREATE INDEX IF NOT EXISTS idx_message_mention_mentioned_user + ON message_mention (mentioned_user_id); + +-- models/message_thread.rs → message_thread +CREATE TABLE IF NOT EXISTS message_thread ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + channel_id UUID NOT NULL, + root_message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + created_by UUID NOT NULL, + replies_count BIGINT NOT NULL DEFAULT 0, + participants_count BIGINT NOT NULL DEFAULT 0, + last_reply_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + last_reply_at TIMESTAMPTZ NULL, + resolved BOOLEAN NOT NULL DEFAULT FALSE, + resolved_by UUID NULL, + resolved_at TIMESTAMPTZ NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_message_thread_root UNIQUE (root_message_id) +); + +CREATE INDEX IF NOT EXISTS idx_message_thread_channel_id + ON message_thread (channel_id); +CREATE INDEX IF NOT EXISTS idx_message_thread_last_reply_at + ON message_thread (last_reply_at DESC NULLS LAST); + +-- models/message_thread_participant.rs → message_thread_participant +CREATE TABLE IF NOT EXISTS message_thread_participant ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + thread_id UUID NOT NULL REFERENCES message_thread(id) ON DELETE CASCADE, + user_id UUID NOT NULL, + joined_reason TEXT NULL, + last_read_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + last_read_at TIMESTAMPTZ NULL, + joined_at TIMESTAMPTZ NOT NULL DEFAULT now(), + CONSTRAINT uq_thread_participant UNIQUE (thread_id, user_id) +); + +CREATE INDEX IF NOT EXISTS idx_thread_participant_thread_id + ON message_thread_participant (thread_id); +CREATE INDEX IF NOT EXISTS idx_thread_participant_user_id + ON message_thread_participant (user_id); + +COMMIT; diff --git a/migrate/006_message_misc.sql b/migrate/006_message_misc.sql new file mode 100644 index 0000000..5966a2b --- /dev/null +++ b/migrate/006_message_misc.sql @@ -0,0 +1,102 @@ +-- ============================================================ +-- Migration: 005_message_misc.sql +-- Tables: message_notification, message_scheduled, message_sticker, +-- message_forward, message_component +-- ============================================================ + +BEGIN; + +-- models/message_notification.rs → message_notification +CREATE TABLE IF NOT EXISTS message_notification ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + channel_id UUID NOT NULL, + user_id UUID NOT NULL, + reason TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + delivery_channel TEXT NULL, + delivered_at TIMESTAMPTZ NULL, + read_at TIMESTAMPTZ NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_message_notification_user_id + ON message_notification (user_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_message_notification_status + ON message_notification (status); + +-- models/message_scheduled.rs → message_scheduled +CREATE TABLE IF NOT EXISTS message_scheduled ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + channel_id UUID NOT NULL, + author_id UUID NOT NULL, + thread_id UUID NULL REFERENCES message_thread(id) ON DELETE SET NULL, + reply_to_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + body TEXT NOT NULL, + metadata JSONB NULL, + scheduled_at TIMESTAMPTZ NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + sent_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, + error TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_message_scheduled_status_at + ON message_scheduled (status, scheduled_at); + +-- models/message_sticker.rs → message_sticker +CREATE TABLE IF NOT EXISTS message_sticker ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + sticker_id UUID NOT NULL, + name TEXT NOT NULL, + image_url TEXT NOT NULL, + format_type TEXT NOT NULL DEFAULT 'png', + pack_name TEXT NULL, + tags TEXT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_message_sticker_message_id + ON message_sticker (message_id); + +-- models/message_forward.rs → message_forward +CREATE TABLE IF NOT EXISTS message_forward ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + source_message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + source_channel_id UUID NOT NULL, + forwarded_by UUID NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_message_forward_message_id + ON message_forward (message_id); +CREATE INDEX IF NOT EXISTS idx_message_forward_source_message_id + ON message_forward (source_message_id); + +-- models/message_component.rs → message_component +CREATE TABLE IF NOT EXISTS message_component ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, + row INTEGER NOT NULL DEFAULT 0, + position INTEGER NOT NULL DEFAULT 0, + component_type TEXT NOT NULL, + custom_id TEXT NOT NULL, + label TEXT NULL, + emoji TEXT NULL, + style TEXT NULL, + url TEXT NULL, + disabled BOOLEAN NOT NULL DEFAULT FALSE, + placeholder TEXT NULL, + min_values INTEGER NULL, + max_values INTEGER NULL, + options JSONB NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_message_component_message_id + ON message_component (message_id); + +COMMIT; diff --git a/migrate/007_fix_uuid_defaults_and_draft_null.sql b/migrate/007_fix_uuid_defaults_and_draft_null.sql new file mode 100644 index 0000000..7e71fa6 --- /dev/null +++ b/migrate/007_fix_uuid_defaults_and_draft_null.sql @@ -0,0 +1,33 @@ +-- Align imks-managed IDs with application-generated UUID v7 values and +-- make top-level drafts unique when thread_id is NULL. + +BEGIN; + +CREATE UNIQUE INDEX IF NOT EXISTS uq_message_draft_channel_user_no_thread + ON message_draft (channel_id, user_id) + WHERE thread_id IS NULL; + +ALTER TABLE message ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_attachment ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_embed ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_embed_field ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_poll ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_poll_option ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_poll_vote ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_pin ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_read_state ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_draft ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_edit ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_article ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_reaction ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_bookmark ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_mention ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_thread ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_thread_participant ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_notification ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_scheduled ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_sticker ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_forward ALTER COLUMN id DROP DEFAULT; +ALTER TABLE message_component ALTER COLUMN id DROP DEFAULT; + +COMMIT; diff --git a/rpc/health.rs b/rpc/health.rs new file mode 100644 index 0000000..8ade398 --- /dev/null +++ b/rpc/health.rs @@ -0,0 +1,63 @@ +//! gRPC health module for imks. +//! +//! Provides: +//! - imks's own gRPC health server (for external health checks) +//! - Health check client for probing appks health (5s interval) + +use std::net::SocketAddr; +use std::time::Duration; + +use tonic_health::pb::HealthCheckRequest; +use tonic_health::pb::health_check_response::ServingStatus; +use tonic_health::pb::health_client::HealthClient; + +use crate::{ImksError, ImksResult}; + +/// Start imks's own gRPC health server on the given address. +/// +/// Reports `SERVING` for the overall server status (empty service name). +pub async fn start_health_server(addr: SocketAddr) -> ImksResult<()> { + let (reporter, health_service) = tonic_health::server::health_reporter(); + + // Empty service name = overall server health. + // reporter is an owned handle — the server will remain SERVING + // indefinitely unless a caller updates the status. + reporter + .set_service_status("", tonic_health::ServingStatus::Serving) + .await; + + tracing::info!(%addr, "imks gRPC health server started"); + + tonic::transport::Server::builder() + .add_service(health_service) + .serve(addr) + .await + .map_err(|e| ImksError::Internal(format!("Health gRPC server: {e:?}")))?; + + Ok(()) +} + +/// Check appks health by opening a short-lived gRPC connection and calling `Check`. +/// +/// Uses a 3-second connect timeout to fail fast when appks is unreachable. +pub async fn check_appks_health(addr: &str) -> ImksResult { + let endpoint = tonic::transport::Endpoint::from_shared(addr.to_string()) + .map_err(|e| ImksError::Internal(format!("health endpoint: {e}")))?; + + // Short-lived connection for health probe only + let channel = endpoint + .connect_timeout(Duration::from_secs(3)) + .connect() + .await + .map_err(ImksError::GrpcTransport)?; + + let mut client = HealthClient::new(channel); + let resp = client + .check(HealthCheckRequest { + service: "".to_string(), + }) + .await + .map_err(ImksError::GrpcStatus)?; + + Ok(resp.into_inner().status == ServingStatus::Serving as i32) +}