Files
zhenyi c794b818ff feat(config): integrate etcd for service discovery and config management
- Add etcd-client dependency for distributed configuration storage
- Implement EtcdConfig with priority: etcd > environment variables > defaults
- Add ServiceRegistry for service registration with lease keep-alive
- Integrate etcd-based service discovery for appks gRPC connections
- Add service watcher for real-time service instance updates
- Migrate Redis configuration from single URL to cluster node list
- Update Dockerfile with default IMKS_HOST and IMKS_PORT environment variables
- Add etcd bootstrap configuration through environment variables
- Implement Redis cluster URL building with optional authentication
2026-06-11 22:50:38 +08:00

234 lines
8.3 KiB
Rust

use std::sync::Arc;
use etcd_client::{Client, EventType, GetOptions, PutOptions, WatchOptions};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
/// etcd-backed config reader. Priority: etcd > env var > default.
pub struct EtcdConfig {
client: Arc<Mutex<Client>>,
prefix: String,
}
impl EtcdConfig {
pub async fn connect(endpoints: Vec<String>, prefix: &str) -> Result<Self, String> {
let client = Client::connect(endpoints, None)
.await
.map_err(|e| format!("etcd connect: {e}"))?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
prefix: prefix.to_string(),
})
}
/// Get config value: etcd first, then env var, then default.
pub async fn get(&self, key: &str, default: &str) -> String {
let etcd_key = format!("{}config/{}", self.prefix, key);
if let Ok(mut c) = self.client.try_lock() {
if let Ok(resp) = c.get(etcd_key.as_str(), None).await {
if let Some(kv) = resp.kvs().first() {
if let Ok(v) = kv.value_str() {
if !v.is_empty() {
tracing::info!(key, value = v, "config from etcd");
return v.to_string();
}
}
}
}
}
if let Ok(v) = std::env::var(key) {
if !v.is_empty() {
return v;
}
}
default.to_string()
}
pub async fn get_parsed<T: std::str::FromStr>(&self, key: &str, default: T) -> T
where
T::Err: std::fmt::Display,
T: std::fmt::Display,
{
let s = self.get(key, &default.to_string()).await;
s.parse().unwrap_or(default)
}
/// Get the etcd client for use by ServiceRegistry.
pub fn client(&self) -> Arc<Mutex<Client>> {
self.client.clone()
}
/// Discover service instances registered under /{prefix}/services/{service_name}/.
pub async fn discover_service(&self, service_name: &str) -> Result<Vec<String>, String> {
let prefix = format!("{}services/{}/", self.prefix, service_name);
let mut client = self.client.lock().await;
let resp = client
.get(prefix.as_str(), Some(GetOptions::new().with_prefix()))
.await
.map_err(|e| format!("etcd get {prefix}: {e}"))?;
let mut addrs = Vec::new();
for kv in resp.kvs() {
if let Ok(value) = kv.value_str() {
if let Ok(instance) = serde_json::from_str::<serde_json::Value>(value) {
if let Some(addr) = instance.get("addr").and_then(|v| v.as_str()) {
addrs.push(addr.to_string());
}
}
}
}
tracing::info!(
service = service_name,
count = addrs.len(),
"discovered instances"
);
Ok(addrs)
}
/// Watch a service prefix for live join/leave events.
///
/// Calls `on_up(addr)` when a new instance appears and `on_down(addr)`
/// when one disappears. The watcher runs in a background task and
/// automatically reconnects on failure.
pub fn start_service_watcher(
&self,
service_name: &str,
on_up: impl Fn(String) + Send + Sync + 'static,
on_down: impl Fn(String) + Send + Sync + 'static,
) {
let client = self.client.clone();
let prefix = self.prefix.clone();
let svc = service_name.to_string();
let watch_prefix = format!("{}services/{}/", prefix, svc);
let on_up = Arc::new(on_up);
let on_down = Arc::new(on_down);
tokio::spawn(async move {
loop {
let mut stream = {
let mut c = client.lock().await;
match c
.watch(
watch_prefix.as_str(),
Some(WatchOptions::new().with_prefix()),
)
.await
{
Ok(s) => s,
Err(e) => {
tracing::warn!(service = %svc, error = %e, "watch failed, retry in 3s");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
continue;
}
}
};
while let Some(resp) = stream.next().await {
let Ok(resp) = resp else { break };
for event in resp.events() {
let Some(kv) = event.kv() else { continue };
let raw = kv.value_str().unwrap_or_default();
let key = kv.key_str().unwrap_or_default();
// Parse JSON to extract the actual address
let addr = serde_json::from_str::<serde_json::Value>(raw)
.ok()
.and_then(|v| {
v.get("addr")
.and_then(|a| a.as_str())
.map(|s| s.to_string())
})
.unwrap_or_else(|| raw.to_string());
match event.event_type() {
EventType::Put => {
tracing::info!(service = %svc, key, addr, "service up");
on_up(addr);
}
EventType::Delete => {
tracing::info!(service = %svc, key, "service down");
on_down(addr);
}
}
}
}
tracing::warn!(service = %svc, "watch stream ended, restarting in 3s");
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
}
});
}
}
/// Register this service instance in etcd.
pub struct ServiceRegistry {
client: Arc<Mutex<Client>>,
prefix: String,
}
impl ServiceRegistry {
pub fn new(client: Arc<Mutex<Client>>, prefix: &str) -> Self {
Self {
client,
prefix: prefix.to_string(),
}
}
pub async fn register(&self, service_name: &str, addr: &str) -> Result<(), String> {
let instance_id = uuid::Uuid::now_v7().to_string();
let addr = addr.to_string();
let key = format!("{}services/{}/{}", self.prefix, service_name, instance_id);
let instance = serde_json::json!({
"addr": &addr,
"port": 0,
"version": env!("CARGO_PKG_VERSION"),
});
let value = serde_json::to_string(&instance).map_err(|e| format!("json: {e}"))?;
let lease = {
let mut client = self.client.lock().await;
client
.lease_grant(60, None)
.await
.map_err(|e| format!("lease: {e}"))?
};
{
let mut client = self.client.lock().await;
let opts = PutOptions::new().with_lease(lease.id());
client
.put(key.clone(), value, Some(opts))
.await
.map_err(|e| format!("put: {e}"))?;
}
tracing::info!(service = service_name, instance = %instance_id, addr = %addr, "registered in etcd");
let c = self.client.clone();
let lease_id = lease.id();
tokio::spawn(async move {
let (mut keeper, mut stream) = {
let mut client = c.lock().await;
match client.lease_keep_alive(lease_id).await {
Ok(pair) => pair,
Err(e) => {
tracing::error!(lease_id, error = %e, "failed to start lease keepalive");
return;
}
}
};
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
interval.tick().await;
if let Err(e) = keeper.keep_alive().await {
tracing::warn!(lease_id, error = %e, "lease keepalive failed");
}
let _ = stream.message().await;
}
});
Ok(())
}
}