feat(registry): add service discovery and health check capabilities
- Integrate tonic-health for gRPC service health monitoring - Add etcd-based service registration with automatic keep-alive - Implement dynamic configuration loading from etcd with fallback - Remove external dependencies from docker-compose for simplified deployment - Refactor service registration logic with improved lease management - Add health service to gRPC server with serving status reporting
This commit is contained in:
@@ -22,6 +22,7 @@ impl EtcdRegistry {
|
||||
self.load_initial("mail").await?;
|
||||
self.spawn_watch("mail");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
+26
@@ -116,6 +116,32 @@ impl EtcdRegistry {
|
||||
ids.sort();
|
||||
ids
|
||||
}
|
||||
|
||||
/// Read config from etcd. Priority: etcd > env > default.
|
||||
/// This is async but can be called from sync context via block_on.
|
||||
pub async fn get_config(&self, key: &str, default: &str) -> String {
|
||||
let etcd_key = format!("{}config/{}", self.inner.key_prefix, key);
|
||||
let mut client = self.inner.client.lock().await;
|
||||
if let Ok(resp) = client.get(etcd_key.as_str(), None).await {
|
||||
if let Some(kv) = resp.kvs().first() {
|
||||
if let Ok(v) = kv.value_str() {
|
||||
if !v.is_empty() {
|
||||
tracing::info!(key, value = v, "config from etcd");
|
||||
return v.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(client);
|
||||
// Fall back to env
|
||||
if let Ok(v) = std::env::var(key) {
|
||||
if !v.is_empty() {
|
||||
return v;
|
||||
}
|
||||
}
|
||||
default.to_string()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Derive a deterministic UUID from a gitks storage_name.
|
||||
|
||||
+19
-66
@@ -2,12 +2,9 @@ use std::collections::HashMap;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use etcd_client::PutOptions;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::error::{AppError, AppResult};
|
||||
|
||||
use super::EtcdRegistry;
|
||||
use super::EtcdRegistryInner;
|
||||
use super::types::ServiceInstance;
|
||||
|
||||
impl EtcdRegistry {
|
||||
@@ -58,74 +55,30 @@ impl EtcdRegistry {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spawn_keep_alive(&self, lease_id: i64, key: String) {
|
||||
fn spawn_keep_alive(&self, lease_id: i64, _key: String) {
|
||||
let inner = self.inner.clone();
|
||||
let interval = self.inner.config.etcd_keep_alive_interval().unwrap_or(10);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let (mut keeper, mut stream) = {
|
||||
let mut client = inner.client.lock().await;
|
||||
match client.lease_keep_alive(lease_id).await {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => {
|
||||
tracing::error!(lease_id, error = %e, "failed to start lease keepalive");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let interval_secs = inner.config.etcd_keep_alive_interval().unwrap_or(10);
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
|
||||
loop {
|
||||
Self::run_keep_alive_stream(&inner, lease_id).await;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(interval)).await;
|
||||
Self::renew_lease_and_reregister(&inner, lease_id, &key).await;
|
||||
interval.tick().await;
|
||||
if let Err(e) = keeper.keep_alive().await {
|
||||
tracing::warn!(lease_id, error = %e, "lease keepalive failed");
|
||||
}
|
||||
let _ = stream.message().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl EtcdRegistry {
|
||||
async fn run_keep_alive_stream(
|
||||
inner: &std::sync::Arc<EtcdRegistryInner>,
|
||||
lease_id: i64,
|
||||
) {
|
||||
let result = {
|
||||
let mut client = inner.client.lock().await;
|
||||
client.lease_keep_alive(lease_id).await
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok((_keeper, mut stream)) => {
|
||||
while let Some(resp) = stream.next().await {
|
||||
if let Err(e) = resp {
|
||||
tracing::warn!(lease_id = lease_id, error = %e, "keep-alive stream error");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(lease_id = lease_id, error = %e, "keep-alive failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn renew_lease_and_reregister(
|
||||
inner: &std::sync::Arc<EtcdRegistryInner>,
|
||||
old_lease_id: i64,
|
||||
key: &str,
|
||||
) {
|
||||
let re_grant = {
|
||||
let mut client = inner.client.lock().await;
|
||||
client
|
||||
.lease_grant(inner.config.etcd_lease_ttl().unwrap_or(15) as i64, None)
|
||||
.await
|
||||
};
|
||||
|
||||
let Ok(current) = re_grant else {
|
||||
return;
|
||||
};
|
||||
|
||||
let new_lease = current.id();
|
||||
inner.lease_id.store(new_lease, Ordering::SeqCst);
|
||||
|
||||
let instance = ServiceInstance {
|
||||
addr: inner.config.rpc_self_listen_addr().unwrap_or_default(),
|
||||
metadata: HashMap::new(),
|
||||
};
|
||||
|
||||
if let Ok(value) = serde_json::to_string(&instance) {
|
||||
let mut client = inner.client.lock().await;
|
||||
let opts = PutOptions::new().with_lease(new_lease);
|
||||
let _ = client.put(key, value, Some(opts)).await;
|
||||
}
|
||||
tracing::info!(old = old_lease_id, new = new_lease, "etcd lease renewed");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user