diff --git a/.gitignore b/.gitignore index 2a0038a..620cd00 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,7 @@ /target -.idea \ No newline at end of file +.idea +.codegraph +.classpath +.project +.settings +.DS_Store diff --git a/Cargo.lock b/Cargo.lock index a90ec69..5b23fef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,28 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "aws-lc-rs" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a2f9779ce85b93ab6170dd940ad0169b5766ff848247aff13bb788b832fe3f4" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.8.9" @@ -137,6 +159,29 @@ dependencies = [ "hybrid-array", ] +[[package]] +name = "bon" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97493a391b4b18ee918675fb8663e53646fd09321c58b46afa04e8ce2499c869" +dependencies = [ + "bon-macros", + "rustversion", +] + +[[package]] +name = "bon-macros" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2af3eac944c12cdf4423eab70d310da0a8e5851a18ffb192c0a5e3f7ae1663" +dependencies = [ + "darling", + "ident_case", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "bstr" version = "1.12.1" @@ -148,6 +193,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bumpalo" +version = "3.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" + [[package]] name = "byteorder" version = "1.5.0" @@ -160,6 +211,18 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "cc" +version = "1.2.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "556e016178bb5662a08681bbe0f00f8e17631781a4dfc8c45e466e4b185ec27f" +dependencies = [ + "find-msvc-tools", + "jobserver", + "libc", + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -175,6 +238,21 @@ dependencies = [ "hashbrown 0.16.1", ] +[[package]] +name = "cmake" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +dependencies = [ + "cc", +] + +[[package]] +name = "const-oid" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" + [[package]] name = "cpufeatures" version = "0.2.17" @@ -236,6 +314,41 @@ dependencies = [ "hybrid-array", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "dashmap" version = "6.2.1" @@ -267,6 +380,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" dependencies = [ "block-buffer 0.12.0", + "const-oid", "crypto-common 0.2.2", ] @@ -360,6 +474,12 @@ dependencies = [ "libc", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + [[package]] name = "fixedbitset" version = "0.5.7" @@ -394,6 +514,27 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -401,6 +542,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -409,6 +551,34 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -427,8 +597,13 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "slab", ] @@ -443,6 +618,29 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "libc", + "r-efi 5.3.0", + "wasip2", +] + [[package]] name = "getrandom" version = "0.4.2" @@ -451,7 +649,7 @@ checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 6.0.0", "wasip2", "wasip3", ] @@ -460,6 +658,7 @@ dependencies = [ name = "gitks" version = "1.0.0" dependencies = [ + "async-trait", "clru", "dotenvy", "duct", @@ -467,6 +666,8 @@ dependencies = [ "gix-archive", "prost", "prost-types", + "ractor", + "ractor_cluster", "serde", "tempfile", "thiserror", @@ -1536,6 +1737,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "indexmap" version = "2.14.0" @@ -1604,6 +1811,28 @@ dependencies = [ "jiff-tzdb", ] +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + +[[package]] +name = "js-sys" +version = "0.3.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" +dependencies = [ + "cfg-if", + "futures-util", + "once_cell", + "wasm-bindgen", +] + [[package]] name = "kstring" version = "2.0.2" @@ -1848,6 +2077,15 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "prettyplease" version = "0.2.37" @@ -1929,6 +2167,70 @@ dependencies = [ "prost", ] +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + [[package]] name = "pulldown-cmark" version = "0.13.4" @@ -1958,12 +2260,102 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "r-efi" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "ractor" +version = "0.15.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12c86deb2af198b10a04c4fb3fba73baf3bb300df765a29272f0e5583da7510" +dependencies = [ + "async-trait", + "bon", + "dashmap", + "futures", + "js-sys", + "once_cell", + "strum", + "tokio", + "tokio_with_wasm", + "tracing", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-time", +] + +[[package]] +name = "ractor_cluster" +version = "0.15.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5566dade327a8b4fa8dc046653cec2b889f00b6ddbbc8df6b020472dc62f5" +dependencies = [ + "async-trait", + "bytes", + "prost", + "prost-build", + "prost-types", + "protoc-bin-vendored", + "ractor", + "ractor_cluster_derive", + "rand", + "sha2", + "socket2", + "tokio", + "tokio-rustls", + "tracing", +] + +[[package]] +name = "ractor_cluster_derive" +version = "0.15.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ed9db7ea11020d50ad74f9ed3eb25ba43c61ab1d8c24986ad967e80d092c01b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "rand" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", +] + [[package]] name = "rawzip" version = "0.4.4" @@ -2008,6 +2400,20 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.17", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rustix" version = "1.1.4" @@ -2021,6 +2427,42 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.23.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" +dependencies = [ + "aws-lc-rs", + "log", + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" +dependencies = [ + "aws-lc-rs", + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -2155,6 +2597,12 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6fe69c597f9c37bfeeeeeb33da3530379845f10be461a66d16d03eca2ded77" +[[package]] +name = "shlex" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8fadd59c855ef2080decdef8ff161eb6661b86933c9d82e5ba29dc602a55aba" + [[package]] name = "sigchld" version = "0.2.4" @@ -2229,6 +2677,39 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "strum" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9628de9b8791db39ceda2b119bbe13134770b56c138ec1d3af810d045c04f9bd" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab85eea0270ee17587ed4156089e10b9e6880ee688791d45a905f5b1ca36f664" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "2.0.117" @@ -2264,7 +2745,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.4.2", "once_cell", "rustix", "windows-sys 0.61.2", @@ -2327,6 +2808,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.61.2", ] @@ -2341,6 +2823,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" @@ -2366,6 +2858,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio_with_wasm" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34e40fbbbd95441133fe9483f522db15dbfd26dc636164ebd8f2dd28759a6aa6" +dependencies = [ + "js-sys", + "tokio", + "tokio_with_wasm_proc", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "tokio_with_wasm_proc" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d01145a2c788d6aae4cd653afec1e8332534d7d783d01897cefcafe4428de992" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "tonic" version = "0.14.6" @@ -2581,6 +3097,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "valuable" version = "0.1.1" @@ -2636,6 +3158,61 @@ dependencies = [ "wit-bindgen 0.51.0", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.122" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" +dependencies = [ + "unicode-ident", +] + [[package]] name = "wasm-encoder" version = "0.244.0" @@ -2670,6 +3247,26 @@ dependencies = [ "semver", ] +[[package]] +name = "web-sys" +version = "0.3.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi-util" version = "0.1.11" @@ -2685,13 +3282,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" dependencies = [ - "windows-targets", + "windows-targets 0.53.5", ] [[package]] @@ -2703,6 +3309,22 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + [[package]] name = "windows-targets" version = "0.53.5" @@ -2710,58 +3332,106 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" dependencies = [ "windows-link", - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + [[package]] name = "windows_aarch64_gnullvm" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + [[package]] name = "windows_aarch64_msvc" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + [[package]] name = "windows_i686_gnu" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + [[package]] name = "windows_i686_gnullvm" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + [[package]] name = "windows_i686_msvc" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + [[package]] name = "windows_x86_64_gnu" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + [[package]] name = "windows_x86_64_gnullvm" version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + [[package]] name = "windows_x86_64_msvc" version = "0.53.1" @@ -2872,6 +3542,32 @@ dependencies = [ "rustix", ] +[[package]] +name = "zerocopy" +version = "0.8.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b065d4f0e55f82fae73202e189638116a87c55ab6b8e6c2721e13dd9d854ad1" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631b19d36a892ab55420c92dbc83ccd79274f25be714855d3074aa71cab639" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + [[package]] name = "zlib-rs" version = "0.6.3" diff --git a/Cargo.toml b/Cargo.toml index 6ce738f..2c34607 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,9 @@ tonic-prost = "0.14" tempfile = "3" dotenvy = "0.15" tracing-subscriber = { version = "0.3", features = ["env-filter"] } - +ractor = { version = "0.15.13", features = ["cluster","tokio_runtime","monitors","message_span_propogation","async-trait"]} +ractor_cluster = { version = "0.15.13", features = ["async-trait"] } +async-trait = "0.1.89" [[bin]] name = "gitks" path = "main.rs" diff --git a/actor/handler.rs b/actor/handler.rs new file mode 100644 index 0000000..31c2210 --- /dev/null +++ b/actor/handler.rs @@ -0,0 +1,200 @@ +use std::collections::HashSet; +use async_trait::async_trait; +use ractor::pg; +use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent}; +use crate::actor::message::{GitNodeMessage, NodeHealth, RouteDecision}; +use crate::pb::RepositoryHeader; +use crate::server::GitksService; + +#[derive(Clone)] +pub struct GitNodeActor { + pub version: String, + pub service: GitksService, +} + +impl GitNodeActor { + pub fn init(service: GitksService) -> Self { + GitNodeActor { + version: env!("CARGO_PKG_VERSION").to_string(), + service, + } + } +} + +pub struct GitNodeArgs { + pub storage_name: String, + pub grpc_addr: String, +} + +pub struct GitNodeState { + storage_name: String, + actor_name: String, + grpc_addr: String, + registered_repos: HashSet, +} + +#[async_trait] +impl Actor for GitNodeActor { + type Msg = GitNodeMessage; + type State = GitNodeState; + type Arguments = GitNodeArgs; + + async fn pre_start( + &self, + myself: ActorRef, + args: Self::Arguments, + ) -> Result { + let actor_name = format!("git_node_{}", args.storage_name); + pg::join("gitks_nodes".to_string(), vec![myself.get_cell()]); + pg::join_scoped(args.storage_name.clone(), "node".to_string(), vec![myself.get_cell()]); + tracing::info!(storage_name = %args.storage_name, actor_name = %actor_name, grpc_addr = %args.grpc_addr, "GitNodeActor started"); + Ok(GitNodeState { + storage_name: args.storage_name, + actor_name, + grpc_addr: args.grpc_addr, + registered_repos: HashSet::new(), + }) + } + + async fn handle( + &self, + myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + GitNodeMessage::ScanAndRegister => { + let repos = self.service.scan_all_repo()?; + tracing::info!(storage_name = %state.storage_name, found = repos.len(), "scanning local repositories"); + for repo_path in repos { + let relative_path = repo_path + .strip_prefix(self.service.repo_prefix.to_string_lossy().as_ref()) + .unwrap_or(&repo_path) + .trim_start_matches('/') + .to_string(); + register_repo(&myself, state, relative_path); + } + } + + GitNodeMessage::RegisterRepository(header) => { + register_repo(&myself, state, header.relative_path); + } + + GitNodeMessage::RemoveRepository(header) => { + state.registered_repos.remove(&header.relative_path); + tracing::info!( + storage_name = %state.storage_name, + relative_path = %header.relative_path, + "repository route removed" + ); + } + + GitNodeMessage::RouteRepository(header, reply) => { + let found = state.registered_repos.contains(&header.relative_path); + reply.send(RouteDecision { + found, + storage_name: state.storage_name.clone(), + relative_path: header.relative_path, + actor_name: if found { state.actor_name.clone() } else { String::new() }, + grpc_addr: if found { state.grpc_addr.clone() } else { String::new() }, + }).ok(); + } + + GitNodeMessage::ListRepositoryPaths(reply) => { + let paths: Vec = state.registered_repos.iter().cloned().collect(); + reply.send(paths.join("\n")).ok(); + } + + GitNodeMessage::RepositoryExists(header, reply) => { + reply.send(state.registered_repos.contains(&header.relative_path)).ok(); + } + + GitNodeMessage::GetNodeHealth(reply) => { + reply.send(NodeHealth { + storage_name: state.storage_name.clone(), + repo_count: state.registered_repos.len() as u64, + healthy: true, + version: self.version.clone(), + }).ok(); + } + } + Ok(()) + } + + async fn handle_supervisor_evt( + &self, + _myself: ActorRef, + evt: SupervisionEvent, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match evt { + SupervisionEvent::ActorStarted(who) => tracing::debug!(actor = ?who.get_id(), "child started"), + SupervisionEvent::ActorTerminated(who, _, reason) => { + tracing::warn!(actor = ?who.get_id(), reason = ?reason, "child terminated") + } + SupervisionEvent::ActorFailed(who, panic_msg) => { + tracing::error!(actor = ?who.get_id(), msg = %panic_msg, "child panicked") + } + SupervisionEvent::ProcessGroupChanged(group) => tracing::info!(group = ?group, "PG membership changed"), + _ => {} + } + Ok(()) + } + + async fn post_stop( + &self, + _myself: ActorRef, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + tracing::info!(storage_name = %state.storage_name, "GitNodeActor stopped"); + Ok(()) + } +} + +fn register_repo(myself: &ActorRef, state: &mut GitNodeState, relative_path: String) { + let category = extract_category(&relative_path); + pg::join_scoped(state.storage_name.clone(), category.to_string(), vec![myself.get_cell()]); + state.registered_repos.insert(relative_path.clone()); + tracing::info!( + storage_name = %state.storage_name, + category = %category, + relative_path = %relative_path, + actor_name = %state.actor_name, + "repository route registered" + ); +} + +fn extract_category(relative_path: &str) -> &str { + relative_path.split('/').next().unwrap_or("root") +} + +pub async fn start_node_actor( + service: GitksService, + storage_name: String, + grpc_addr: String, +) -> Result<(ActorRef, tokio::task::JoinHandle<()>), ractor::SpawnErr> { + let actor = GitNodeActor::init(service); + let (actor_ref, handle) = Actor::spawn( + Some(format!("git_node_{storage_name}")), + actor, + GitNodeArgs { storage_name, grpc_addr }, + ).await?; + actor_ref.cast(GitNodeMessage::ScanAndRegister).ok(); + Ok((actor_ref, handle)) +} + +pub fn get_cluster_nodes(storage_name: &str) -> Vec { + pg::get_scoped_members(&storage_name.to_string(), &"node".to_string()) +} + +pub fn get_category_members(storage_name: &str, category: &str) -> Vec { + pg::get_scoped_members(&storage_name.to_string(), &category.to_string()) +} + +pub fn route_group_for(header: &RepositoryHeader) -> String { + extract_category(&header.relative_path).to_string() +} + +pub fn list_all_groups() -> Vec { + pg::which_groups() +} diff --git a/actor/message.rs b/actor/message.rs new file mode 100644 index 0000000..c778925 --- /dev/null +++ b/actor/message.rs @@ -0,0 +1,126 @@ +use ractor::RpcReplyPort; +use ractor_cluster::BytesConvertable; +use ractor_cluster::RactorClusterMessage; +use crate::pb::RepositoryHeader; + +impl BytesConvertable for RepositoryHeader { + fn into_bytes(self) -> Vec { + prost::Message::encode_to_vec(&self) + } + + fn from_bytes(bytes: Vec) -> Self { + prost::Message::decode(bytes.as_slice()).unwrap_or_default() + } +} + +#[derive(Debug, Clone)] +pub struct RouteDecision { + pub found: bool, + pub storage_name: String, + pub relative_path: String, + pub actor_name: String, + pub grpc_addr: String, +} + +impl BytesConvertable for RouteDecision { + fn into_bytes(self) -> Vec { + encode_strings(&[ + if self.found { "1" } else { "0" }.to_string(), + self.storage_name, + self.relative_path, + self.actor_name, + self.grpc_addr, + ]) + } + + fn from_bytes(bytes: Vec) -> Self { + let values = decode_strings(bytes); + Self { + found: values.first().is_some_and(|v| v == "1"), + storage_name: values.get(1).cloned().unwrap_or_default(), + relative_path: values.get(2).cloned().unwrap_or_default(), + actor_name: values.get(3).cloned().unwrap_or_default(), + grpc_addr: values.get(4).cloned().unwrap_or_default(), + } + } +} + +#[derive(Debug, Clone)] +pub struct NodeHealth { + pub storage_name: String, + pub repo_count: u64, + pub healthy: bool, + pub version: String, +} + +impl BytesConvertable for NodeHealth { + fn into_bytes(self) -> Vec { + encode_strings(&[ + self.storage_name, + self.repo_count.to_string(), + if self.healthy { "1" } else { "0" }.to_string(), + self.version, + ]) + } + + fn from_bytes(bytes: Vec) -> Self { + let values = decode_strings(bytes); + Self { + storage_name: values.first().cloned().unwrap_or_default(), + repo_count: values.get(1).and_then(|v| v.parse().ok()).unwrap_or_default(), + healthy: values.get(2).is_some_and(|v| v == "1"), + version: values.get(3).cloned().unwrap_or_default(), + } + } +} + +#[derive(RactorClusterMessage)] +pub enum GitNodeMessage { + ScanAndRegister, + + RegisterRepository(RepositoryHeader), + + RemoveRepository(RepositoryHeader), + + #[rpc] + RouteRepository(RepositoryHeader, RpcReplyPort), + + #[rpc] + ListRepositoryPaths(RpcReplyPort), + + #[rpc] + RepositoryExists(RepositoryHeader, RpcReplyPort), + + #[rpc] + GetNodeHealth(RpcReplyPort), +} + +#[derive(ractor_cluster::RactorMessage)] +pub enum RepoActorMessage { + UpdateMetadata(RepositoryHeader), +} + +fn encode_strings(values: &[String]) -> Vec { + let mut buf = Vec::new(); + for value in values { + let bytes = value.as_bytes(); + buf.extend((bytes.len() as u64).to_be_bytes()); + buf.extend(bytes); + } + buf +} + +fn decode_strings(bytes: Vec) -> Vec { + let mut values = Vec::new(); + let mut offset = 0; + while offset + 8 <= bytes.len() { + let len = u64::from_be_bytes(bytes[offset..offset + 8].try_into().unwrap()) as usize; + offset += 8; + if offset + len > bytes.len() { + break; + } + values.push(String::from_utf8_lossy(&bytes[offset..offset + len]).into_owned()); + offset += len; + } + values +} diff --git a/actor/mod.rs b/actor/mod.rs new file mode 100644 index 0000000..7f214ab --- /dev/null +++ b/actor/mod.rs @@ -0,0 +1,7 @@ +pub mod message; +pub mod handler; +pub mod server; + +pub use handler::{GitNodeActor, GitNodeArgs, start_node_actor, get_cluster_nodes, get_category_members, route_group_for, list_all_groups}; +pub use server::init_actor_cluster; +pub use message::{GitNodeMessage, NodeHealth, RepoActorMessage, RouteDecision}; diff --git a/actor/server.rs b/actor/server.rs new file mode 100644 index 0000000..009f9c8 --- /dev/null +++ b/actor/server.rs @@ -0,0 +1,15 @@ +use ractor::ActorRef; +use crate::actor::handler::start_node_actor; +use crate::actor::message::GitNodeMessage; +use crate::server::GitksService; + +pub async fn init_actor_cluster( + service: GitksService, + storage_name: String, + grpc_addr: String, +) -> Result<(ActorRef, tokio::task::JoinHandle<()>), ractor::SpawnErr> { + tracing::info!(storage_name = %storage_name, grpc_addr = %grpc_addr, "initializing actor cluster"); + let result = start_node_actor(service, storage_name.clone(), grpc_addr).await?; + tracing::info!(storage_name = %storage_name, "actor cluster ready"); + Ok(result) +} diff --git a/bare.rs b/bare.rs index e474110..b24f5d8 100644 --- a/bare.rs +++ b/bare.rs @@ -13,8 +13,6 @@ impl GitBare { Self { bare_dir } } - /// Open the gix repository. Callers should open once per logical operation - /// and reuse the handle for all gix lookups within that operation. pub fn gix_repo(&self) -> GitResult { tracing::debug!(repo = %self.bare_dir.display(), "opening gix repository"); gix::open(&self.bare_dir).map_err(|e| { @@ -39,7 +37,6 @@ impl GitBare { } PathBuf::from(p) } else if !relative_path.is_empty() { - // relative_path alone is rejected unless absolute return Err(GitError::InvalidArgument( "relative_path requires storage_path to be set".into(), )); @@ -47,14 +44,11 @@ impl GitBare { return Err(GitError::InvalidArgument("empty repository path".into())); }; - // Join relative_path if provided let bare_dir = if !relative_path.is_empty() && !storage_path.is_empty() { let candidate = base.join(relative_path); - // Canonicalize to resolve any `..` / symlinks, then check still under base let canonical = candidate .canonicalize() .unwrap_or_else(|_| candidate.clone()); - // Path traversal check: canonical resolved dir must start with base let base_canon = base.canonicalize().unwrap_or_else(|_| base.clone()); if !canonical.starts_with(&base_canon) { tracing::warn!( @@ -73,7 +67,6 @@ impl GitBare { return Err(GitError::InvalidArgument("empty repository path".into())); }; - // Validate bare_dir exists, is a directory, and is readable if !bare_dir.exists() { tracing::warn!(path = %bare_dir.display(), "repository not found"); return Err(GitError::RepoNotFound); @@ -85,10 +78,8 @@ impl GitBare { ))); } - // Accept either bare repos (HEAD file) or non-bare (HEAD + .git) let head_path = bare_dir.join("HEAD"); if !head_path.exists() { - // Maybe it's a non-bare repo let git_dir = bare_dir.join(".git"); if git_dir.is_dir() && git_dir.join("HEAD").exists() { tracing::debug!(path = %git_dir.display(), "resolved non-bare repo via .git subdir"); @@ -100,7 +91,6 @@ impl GitBare { Ok(Self { bare_dir }) } - /// Detect the repository's object format (SHA-1 or SHA-256). pub fn object_format(&self) -> crate::pb::ObjectFormat { let repo = self.gix_repo().ok(); let kind = repo diff --git a/lib.rs b/lib.rs index fc1a004..bc8c433 100644 --- a/lib.rs +++ b/lib.rs @@ -16,3 +16,4 @@ pub mod refs; pub mod server; pub mod tag; pub mod tree; +pub mod actor; \ No newline at end of file diff --git a/main.rs b/main.rs index 21bfa08..77a9430 100644 --- a/main.rs +++ b/main.rs @@ -1,14 +1,16 @@ use std::path::PathBuf; -use gitks::server::serve; +use gitks::actor::init_actor_cluster; +use gitks::server::{serve, GitksService}; const DEFAULT_HOST: &str = "0.0.0.0"; const DEFAULT_PORT: &str = "50051"; +const DEFAULT_STORAGE_NAME: &str = "default"; #[tokio::main] async fn main() -> Result<(), Box> { dotenvy::dotenv().ok(); - tracing_subscriber::fmt::init(); + tracing_subscriber::fmt().init(); tracing::info!( version = env!("CARGO_PKG_VERSION"), @@ -17,6 +19,9 @@ async fn main() -> Result<(), Box> { let host = std::env::var("GITKS_HOST").unwrap_or_else(|_| DEFAULT_HOST.into()); let port = std::env::var("GITKS_PORT").unwrap_or_else(|_| DEFAULT_PORT.into()); + let storage_name = std::env::var("STORAGE_NAME").unwrap_or_else(|_| DEFAULT_STORAGE_NAME.into()); + let grpc_addr = std::env::var("GITKS_ADVERTISE_ADDR") + .unwrap_or_else(|_| format!("http://{host}:{port}")); let repo_prefix = std::env::var("REPO_PREFIX_PATH") .map_err(|_| "REPO_PREFIX_PATH environment variable is required (e.g. /data/repos)")?; @@ -25,21 +30,28 @@ async fn main() -> Result<(), Box> { return Err("REPO_PREFIX_PATH must be an absolute path".into()); } if !repo_prefix.exists() { - tracing::info!( - path = %repo_prefix.display(), - "creating repo prefix directory" - ); + tracing::info!(path = %repo_prefix.display(), "creating repo prefix directory"); std::fs::create_dir_all(&repo_prefix)?; } let addr: std::net::SocketAddr = format!("{host}:{port}").parse()?; + let actor_svc = GitksService::new(repo_prefix.clone()); + let (node_actor, node_handle) = init_actor_cluster( + actor_svc, + storage_name.clone(), + grpc_addr.clone(), + ).await?; + let svc = GitksService::new(repo_prefix.clone()).with_actor(node_actor.clone()); tracing::info!( - "starting gitks gRPC server on {addr}, repo prefix: {}", + "starting gitks gRPC server on {addr}, repo prefix: {}, storage: {storage_name}, advertise: {grpc_addr}", repo_prefix.display() ); - serve(addr, repo_prefix).await?; + serve(addr, svc).await?; + + node_actor.stop(None); + node_handle.await?; tracing::info!("gitks shut down"); Ok(()) diff --git a/server/archive.rs b/server/archive.rs index 53401da..3738a36 100644 --- a/server/archive.rs +++ b/server/archive.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::archive_service_client::ArchiveServiceClient; use super::{GitksService, cache, into_status}; +async fn remote_archive_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding archive rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = ArchiveServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl archive_service_server::ArchiveService for GitksService { type GetArchiveStream = @@ -15,7 +35,18 @@ impl archive_service_server::ArchiveService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("archive.get_archive", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_archive_client(self, inner.repository.as_ref()).await? { + let resp = client.get_archive(inner).await?; + let stream = super::bridge_server_stream(resp.into_inner()); + return Ok(tonic::Response::new(stream)); + } + return Err(err); + } + Err(err) => return Err(err), + }; let stream = gb.get_archive_stream(inner)?; tracing::info!(%repo, "archive streaming started"); Ok(tonic::Response::new(stream)) @@ -29,7 +60,16 @@ impl archive_service_server::ArchiveService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("archive.list_archive_entries", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_archive_client(self, inner.repository.as_ref()).await? { + return client.list_archive_entries(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.treeish) { cache::cached_response("archive.list_archive_entries", &inner, || { gb.list_archive_entries(inner.clone()).map_err(into_status) diff --git a/server/blame.rs b/server/blame.rs index 40b1a84..537502b 100644 --- a/server/blame.rs +++ b/server/blame.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::blame_service_client::BlameServiceClient; use super::{GitksService, cache, into_status, into_stream}; +async fn remote_blame_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding blame rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = BlameServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl blame_service_server::BlameService for GitksService { type StreamBlameStream = @@ -16,7 +36,16 @@ impl blame_service_server::BlameService for GitksService { let path = inner.path.clone(); let span = tracing::info_span!("blame.blame", %repo, %path); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_blame_client(self, inner.repository.as_ref()).await? { + return client.blame(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("blame.blame", &inner, || { gb.blame(inner.clone()).map_err(into_status) @@ -37,7 +66,18 @@ impl blame_service_server::BlameService for GitksService { let path = inner.path.clone(); let span = tracing::info_span!("blame.stream_blame", %repo, %path); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_blame_client(self, inner.repository.as_ref()).await? { + let resp = client.stream_blame(inner).await?; + let stream = super::bridge_server_stream(resp.into_inner()); + return Ok(tonic::Response::new(stream)); + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("blame.blame", &inner, || { gb.blame(inner.clone()).map_err(into_status) diff --git a/server/branch.rs b/server/branch.rs index fedb8bf..616d60f 100644 --- a/server/branch.rs +++ b/server/branch.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::branch_service_client::BranchServiceClient; use super::{GitksService, into_status}; +async fn remote_branch_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding branch rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = BranchServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl branch_service_server::BranchService for GitksService { async fn list_branches( @@ -12,7 +32,16 @@ impl branch_service_server::BranchService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("branch.list_branches", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.list_branches(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.list_branches(inner).map_err(into_status)?; tracing::info!(%repo, count = resp.branches.len(), "list_branches done"); Ok(tonic::Response::new(resp)) @@ -27,7 +56,16 @@ impl branch_service_server::BranchService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("branch.get_branch", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.get_branch(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.get_branch(inner).map_err(into_status)?; Ok(tonic::Response::new(resp)) } @@ -41,7 +79,16 @@ impl branch_service_server::BranchService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("branch.create_branch", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.create_branch(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.create_branch(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch created"); Ok(tonic::Response::new(resp)) @@ -56,7 +103,16 @@ impl branch_service_server::BranchService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("branch.delete_branch", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.delete_branch(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; gb.delete_branch(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch deleted"); Ok(tonic::Response::new(())) @@ -72,7 +128,16 @@ impl branch_service_server::BranchService for GitksService { let new = inner.new_name.clone(); let span = tracing::info_span!("branch.rename_branch", %repo, %old, %new); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.rename_branch(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.rename_branch(inner).map_err(into_status)?; tracing::info!(%repo, old = %old, new = %new, "branch renamed"); Ok(tonic::Response::new(resp)) @@ -87,7 +152,16 @@ impl branch_service_server::BranchService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("branch.update_branch_target", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.update_branch_target(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.update_branch_target(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch target updated"); Ok(tonic::Response::new(resp)) @@ -102,7 +176,16 @@ impl branch_service_server::BranchService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("branch.set_branch_upstream", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.set_branch_upstream(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.set_branch_upstream(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch upstream set"); Ok(tonic::Response::new(resp)) @@ -118,7 +201,16 @@ impl branch_service_server::BranchService for GitksService { let target = inner.target_branch.clone(); let span = tracing::info_span!("branch.compare_branch", %repo, %source, %target); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + return client.compare_branch(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.compare_branch(inner).map_err(into_status)?; tracing::info!(%repo, %source, %target, ahead = resp.ahead_by, behind = resp.behind_by, "branch compared"); Ok(tonic::Response::new(resp)) diff --git a/server/commit.rs b/server/commit.rs index a028c5f..0644143 100644 --- a/server/commit.rs +++ b/server/commit.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::commit_service_client::CommitServiceClient; use super::{GitksService, cache, into_status}; +async fn remote_commit_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding commit rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = CommitServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl commit_service_server::CommitService for GitksService { async fn list_commits( @@ -12,7 +32,16 @@ impl commit_service_server::CommitService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("commit.list_commits", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + return client.list_commits(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if !inner.all && cache::selector_is_oid(&inner.revision) { cache::cached_response("commit.list_commits", &inner, || { gb.list_commits(inner.clone()).map_err(into_status) @@ -32,7 +61,16 @@ impl commit_service_server::CommitService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("commit.get_commit", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + return client.get_commit(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("commit.get_commit", &inner, || { gb.get_commit(inner.clone()).map_err(into_status) @@ -51,7 +89,16 @@ impl commit_service_server::CommitService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("commit.get_commit_ancestors", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + return client.get_commit_ancestors(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("commit.get_commit_ancestors", &inner, || { gb.get_commit_ancestors(inner.clone()).map_err(into_status) @@ -72,7 +119,16 @@ impl commit_service_server::CommitService for GitksService { let branch = inner.branch.clone(); let span = tracing::info_span!("commit.create_commit", %repo, %branch); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + return client.create_commit(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.create_commit(inner).map_err(into_status)?; let commit_hex = resp.commit.as_ref() .and_then(|c| c.oid.as_ref().map(|o| o.hex.as_str()).or(Some("?"))) @@ -90,7 +146,16 @@ impl commit_service_server::CommitService for GitksService { let branch = inner.branch.clone(); let span = tracing::info_span!("commit.revert_commit", %repo, %branch); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + return client.revert_commit(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.revert_commit(inner).map_err(into_status)?; tracing::info!(%repo, %branch, "commit reverted"); Ok(tonic::Response::new(resp)) @@ -105,7 +170,16 @@ impl commit_service_server::CommitService for GitksService { let branch = inner.branch.clone(); let span = tracing::info_span!("commit.cherry_pick_commit", %repo, %branch); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + return client.cherry_pick_commit(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.cherry_pick_commit(inner).map_err(into_status)?; tracing::info!(%repo, %branch, "commit cherry-picked"); Ok(tonic::Response::new(resp)) @@ -119,7 +193,16 @@ impl commit_service_server::CommitService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("commit.compare_commits", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + return client.compare_commits(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selectors_are_oid(&inner.base, &inner.head) { cache::cached_response("commit.compare_commits", &inner, || { gb.compare_commits(inner.clone()).map_err(into_status) diff --git a/server/diff.rs b/server/diff.rs index cd2b492..f002e20 100644 --- a/server/diff.rs +++ b/server/diff.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::diff_service_client::DiffServiceClient; use super::{GitksService, cache, into_status, into_stream}; +async fn remote_diff_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding diff rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = DiffServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl diff_service_server::DiffService for GitksService { type GetPatchStream = @@ -15,7 +35,16 @@ impl diff_service_server::DiffService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("diff.get_diff", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + return client.get_diff(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selectors_are_oid(&inner.base, &inner.head) { cache::cached_response("diff.get_diff", &inner, || { gb.get_diff(inner.clone()).map_err(into_status) @@ -35,7 +64,16 @@ impl diff_service_server::DiffService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("diff.get_commit_diff", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + return client.get_commit_diff(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.commit) { cache::cached_response("diff.get_commit_diff", &inner, || { gb.get_commit_diff(inner.clone()).map_err(into_status) @@ -55,7 +93,18 @@ impl diff_service_server::DiffService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("diff.get_patch", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + let resp = client.get_patch(inner).await?; + let stream = super::bridge_server_stream(resp.into_inner()); + return Ok(tonic::Response::new(stream)); + } + return Err(err); + } + Err(err) => return Err(err), + }; let items = if cache::selectors_are_oid(&inner.base, &inner.head) { cache::cached_vec_response("diff.get_patch", &inner, || { gb.get_patch(inner.clone()).map_err(into_status) @@ -74,7 +123,16 @@ impl diff_service_server::DiffService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("diff.get_diff_stats", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + return client.get_diff_stats(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selectors_are_oid(&inner.base, &inner.head) { cache::cached_response("diff.get_diff_stats", &inner, || { gb.get_diff_stats(inner.clone()).map_err(into_status) diff --git a/server/merge.rs b/server/merge.rs index 2d974fd..dc44cde 100644 --- a/server/merge.rs +++ b/server/merge.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::merge_service_client::MergeServiceClient; use super::{GitksService, into_status}; +async fn remote_merge_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding merge rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = MergeServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl merge_service_server::MergeService for GitksService { async fn check_merge( @@ -12,7 +32,16 @@ impl merge_service_server::MergeService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("merge.check_merge", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + return client.check_merge(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.check_merge(inner).map_err(into_status)?; tracing::info!(%repo, status = resp.status, "check_merge done"); Ok(tonic::Response::new(resp)) @@ -27,7 +56,16 @@ impl merge_service_server::MergeService for GitksService { let target = inner.target_branch.clone(); let span = tracing::info_span!("merge.merge", %repo, %target); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + return client.merge(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.merge(inner).map_err(into_status)?; tracing::info!(%repo, %target, status = resp.status, "merge done"); Ok(tonic::Response::new(resp)) @@ -41,7 +79,16 @@ impl merge_service_server::MergeService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("merge.list_merge_conflicts", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + return client.list_merge_conflicts(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.list_merge_conflicts(inner).map_err(into_status)?; tracing::info!(%repo, conflicts = resp.conflicts.len(), "list_merge_conflicts done"); Ok(tonic::Response::new(resp)) @@ -56,7 +103,16 @@ impl merge_service_server::MergeService for GitksService { let target = inner.target_branch.clone(); let span = tracing::info_span!("merge.resolve_merge_conflicts", %repo, %target); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + return client.resolve_merge_conflicts(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.resolve_merge_conflicts(inner).map_err(into_status)?; tracing::info!(%repo, %target, status = resp.status, "merge conflicts resolved"); Ok(tonic::Response::new(resp)) @@ -71,7 +127,16 @@ impl merge_service_server::MergeService for GitksService { let branch = inner.branch.clone(); let span = tracing::info_span!("merge.rebase", %repo, %branch); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + return client.rebase(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.rebase(inner).map_err(into_status)?; tracing::info!(%repo, %branch, status = resp.status, "rebase done"); Ok(tonic::Response::new(resp)) diff --git a/server/mod.rs b/server/mod.rs index e289ce5..fe94b3f 100644 --- a/server/mod.rs +++ b/server/mod.rs @@ -11,11 +11,14 @@ mod repository_maint; mod tag; mod tree; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use gix::discover::is_git; +use ractor::{ActorCell, ActorRef}; use tokio_stream::wrappers::ReceiverStream; +use crate::actor::message::{GitNodeMessage, RouteDecision}; use crate::bare::GitBare; -use crate::error::GitError; +use crate::error::{GitError, GitResult}; use crate::pb::{ archive_service_server, blame_service_server, branch_service_server, commit_service_server, diff_service_server, merge_service_server, pack_service_server, repository_service_server, @@ -24,11 +27,52 @@ use crate::pb::{ #[derive(Clone)] pub struct GitksService { - /// Root prefix path for all repositories pub repo_prefix: PathBuf, + pub node_actor: Option>, } impl GitksService { + pub fn new(repo_prefix: PathBuf) -> Self { + Self { repo_prefix, node_actor: None } + } + + pub fn with_actor(mut self, node_actor: ActorRef) -> Self { + self.node_actor = Some(node_actor); + self + } + + pub fn scan_all_repo(&self) -> GitResult> { + let root = self.repo_prefix.as_ref(); + let mut repos = Vec::new(); + if is_bare_git_repo(root) { + repos.push(root.to_path_buf()); + } else { + scan_bare_repos_recursively(root, &mut repos)?; + } + Ok(repos + .into_iter() + .filter_map(|path| path.to_str().map(str::to_owned)) + .collect()) + } + pub async fn route_repository( + &self, + header: &crate::pb::RepositoryHeader, + ) -> Result, tonic::Status> { + let members = ractor::pg::get_members(&"gitks_nodes".to_string()); + let local = self.node_actor.as_ref().map(|actor| actor.get_cell()); + for member in members { + if local.as_ref().is_some_and(|actor| actor == &member) { + continue; + } + if let Some(decision) = query_route(member, header.clone()).await? { + if decision.found && !decision.grpc_addr.is_empty() { + return Ok(Some(decision)); + } + } + } + Ok(None) + } + fn repo_label(&self, header: Option<&crate::pb::RepositoryHeader>) -> String { header .and_then(|h| { @@ -93,6 +137,70 @@ impl GitksService { } } + + +pub(super) async fn remote_endpoint(addr: &str) -> Result { + let uri: tonic::codegen::http::Uri = addr + .parse() + .map_err(|e| tonic::Status::invalid_argument(format!("invalid URI: {e}")))?; + tonic::transport::Endpoint::new(uri) + .map_err(|e| tonic::Status::internal(e.to_string())) +} + +pub(super) fn bridge_server_stream( + mut remote: tonic::Streaming, +) -> tokio_stream::wrappers::ReceiverStream> { + let (tx, rx) = tokio::sync::mpsc::channel(16); + tokio::spawn(async move { + use tokio_stream::StreamExt; + while let Some(item) = remote.next().await { + if tx.send(item).await.is_err() { + break; + } + } + }); + tokio_stream::wrappers::ReceiverStream::new(rx) +} + +async fn query_route( + member: ActorCell, + header: crate::pb::RepositoryHeader, +) -> Result, tonic::Status> { + let actor_ref: ActorRef = member.into(); + match ractor::call_t!(actor_ref, GitNodeMessage::RouteRepository, 500, header) { + Ok(decision) => Ok(Some(decision)), + Err(err) => { + tracing::warn!(error = %err, "repository route query failed"); + Ok(None) + } + } +} + +fn scan_bare_repos_recursively(dir: &Path, repos: &mut Vec) -> GitResult<()> { + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + + if is_bare_git_repo(&path) { + repos.push(path); + continue; + } + + if path.is_dir() { + scan_bare_repos_recursively(&path, repos)?; + } + } + + Ok(()) +} + +fn is_bare_git_repo(path: &Path) -> bool { + match is_git(path) { + Ok(repo) => repo.is_bare(), + Err(_) => false, + } +} + pub(crate) fn into_status(e: GitError) -> tonic::Status { match &e { GitError::NotFound(_) @@ -160,11 +268,10 @@ pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> Result Result<(), tonic::transport::Error> { let span = tracing::info_span!("gitks.server", %addr); let _enter = span.enter(); - let svc = GitksService { repo_prefix }; tracing::info!("registering gRPC services"); let server = tonic::transport::Server::builder() .add_service(repository_service_server::RepositoryServiceServer::new( diff --git a/server/pack.rs b/server/pack.rs index 4481195..ae74f75 100644 --- a/server/pack.rs +++ b/server/pack.rs @@ -2,9 +2,29 @@ use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use crate::pb::*; +use crate::pb::pack_service_client::PackServiceClient; use super::{GitksService, into_status}; +async fn remote_pack_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding pack rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = PackServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl pack_service_server::PackService for GitksService { type UploadPackStream = ReceiverStream>; @@ -19,7 +39,16 @@ impl pack_service_server::PackService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.advertise_refs", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + return client.advertise_refs(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.advertise_refs(inner).map_err(into_status)?; tracing::info!(%repo, refs = resp.references.len(), "advertise_refs done"); Ok(tonic::Response::new(resp)) @@ -37,8 +66,30 @@ impl pack_service_server::PackService for GitksService { let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.upload_pack", %repo); let _enter = span.enter(); + let gb = match self.resolve(first.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_pack_client(self, first.repository.as_ref()).await? { + let (tx, rx) = tokio::sync::mpsc::channel(16); + let _ = tx.send(first).await; + tokio::spawn(async move { + use tokio_stream::StreamExt; + while let Some(msg) = stream.next().await { + match msg { + Ok(m) => { if tx.send(m).await.is_err() { break; } } + Err(_) => break, + } + } + }); + let resp = client.upload_pack(tokio_stream::wrappers::ReceiverStream::new(rx)).await?; + let out = super::bridge_server_stream(resp.into_inner()); + return Ok(tonic::Response::new(out)); + } + return Err(err); + } + Err(err) => return Err(err), + }; tracing::info!(%repo, "upload-pack streaming started"); - let gb = self.resolve(first.repository.as_ref())?; let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) @@ -68,8 +119,30 @@ impl pack_service_server::PackService for GitksService { let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.receive_pack", %repo); let _enter = span.enter(); + let gb = match self.resolve(first.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_pack_client(self, first.repository.as_ref()).await? { + let (tx, rx) = tokio::sync::mpsc::channel(16); + let _ = tx.send(first).await; + tokio::spawn(async move { + use tokio_stream::StreamExt; + while let Some(msg) = stream.next().await { + match msg { + Ok(m) => { if tx.send(m).await.is_err() { break; } } + Err(_) => break, + } + } + }); + let resp = client.receive_pack(tokio_stream::wrappers::ReceiverStream::new(rx)).await?; + let out = super::bridge_server_stream(resp.into_inner()); + return Ok(tonic::Response::new(out)); + } + return Err(err); + } + Err(err) => return Err(err), + }; tracing::info!(%repo, "receive-pack streaming started"); - let gb = self.resolve(first.repository.as_ref())?; let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) @@ -95,7 +168,18 @@ impl pack_service_server::PackService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.pack_objects", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + let resp = client.pack_objects(inner).await?; + let stream = super::bridge_server_stream(resp.into_inner()); + return Ok(tonic::Response::new(stream)); + } + return Err(err); + } + Err(err) => return Err(err), + }; let stream = gb.pack_objects(inner).await?; tracing::info!(%repo, "pack-objects streaming started"); Ok(tonic::Response::new(stream)) @@ -113,7 +197,16 @@ impl pack_service_server::PackService for GitksService { let repo = self.repo_label(inputs.first().and_then(|r| r.repository.as_ref())); let span = tracing::info_span!("pack.index_pack", %repo); let _enter = span.enter(); - let gb = self.resolve(inputs.first().and_then(|r| r.repository.as_ref()))?; + let gb = match self.resolve(inputs.first().and_then(|r| r.repository.as_ref())) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_pack_client(self, inputs.first().and_then(|r| r.repository.as_ref())).await? { + return client.index_pack(tokio_stream::iter(inputs)).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.index_pack(inputs).map_err(into_status)?; tracing::info!(%repo, objects = resp.object_count, "index_pack done"); Ok(tonic::Response::new(resp)) @@ -127,7 +220,16 @@ impl pack_service_server::PackService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.list_packfiles", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + return client.list_packfiles(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.list_packfiles(inner).map_err(into_status)?; tracing::info!(%repo, count = resp.packfiles.len(), "list_packfiles done"); Ok(tonic::Response::new(resp)) @@ -141,7 +243,16 @@ impl pack_service_server::PackService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.fsck", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + return client.fsck(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.fsck(inner).map_err(into_status)?; tracing::info!(%repo, ok = resp.ok, errors = resp.errors.len(), warnings = resp.warnings.len(), "fsck done"); Ok(tonic::Response::new(resp)) diff --git a/server/repository.rs b/server/repository.rs index ad96cd4..b6b478c 100644 --- a/server/repository.rs +++ b/server/repository.rs @@ -1,6 +1,26 @@ use crate::pb::*; +use crate::pb::repository_service_client::RepositoryServiceClient; -use super::{GitksService, git_cmd, into_status, repository_maint}; +use super::{GitksService, git_cmd, into_status, repository_maint, remote_endpoint}; + +async fn remote_repository_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding repository rpc"); + let endpoint = remote_endpoint(&route.grpc_addr).await?; + let client = RepositoryServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} fn default_branch_name(gb: &crate::bare::GitBare) -> String { git_cmd(gb, &["symbolic-ref", "HEAD"]) @@ -24,7 +44,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.get_repository", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.get_repository(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let bare = gb.bare_dir.join("HEAD").exists(); let object_format = gb.object_format(); Ok(tonic::Response::new(Repository { @@ -64,6 +93,11 @@ impl repository_service_server::RepositoryService for GitksService { let span = tracing::info_span!("repo.delete_repository", %repo); let _enter = span.enter(); let bare_dir = self.resolve_for_init(inner.repository.as_ref())?; + if !bare_dir.exists() { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.delete_repository(inner).await; + } + } tracing::warn!(%repo, path = %bare_dir.display(), "deleting repository"); std::fs::remove_dir_all(&bare_dir).map_err(|e| tonic::Status::internal(e.to_string()))?; tracing::info!(%repo, "repository deleted"); @@ -80,6 +114,11 @@ impl repository_service_server::RepositoryService for GitksService { let _enter = span.enter(); let bare_dir = self.resolve_for_init(inner.repository.as_ref())?; let exists = bare_dir.exists() && bare_dir.is_dir() && bare_dir.join("HEAD").exists(); + if !exists { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.repository_exists(inner).await; + } + } Ok(tonic::Response::new(RepositoryExistsResponse { exists })) } @@ -91,7 +130,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.get_object_format", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.get_object_format(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; Ok(tonic::Response::new(RepositoryObjectFormatResponse { object_format: gb.object_format() as i32, })) @@ -105,7 +153,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.get_default_branch", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.get_default_branch(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; Ok(tonic::Response::new(GetDefaultBranchResponse { name: default_branch_name(&gb), })) @@ -120,7 +177,16 @@ impl repository_service_server::RepositoryService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("repo.set_default_branch", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.set_default_branch(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let refname = format!("refs/heads/{}", inner.name); let out = git_cmd(&gb, &["symbolic-ref", "HEAD", &refname])?; if !out.status.success() { @@ -140,7 +206,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.get_repository_config", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.get_repository_config(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let mut entries = Vec::new(); if inner.keys.is_empty() { let out = git_cmd(&gb, &["config", "--list"])?; @@ -188,7 +263,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.set_repository_config", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.set_repository_config(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; for entry in &inner.entries { if entry.values.is_empty() { git_cmd(&gb, &["config", "--unset-all", &entry.key])?; @@ -213,7 +297,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.get_repository_statistics", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.get_repository_statistics(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; Ok(tonic::Response::new(repository_maint::get_statistics(&gb))) } @@ -225,7 +318,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.check_repository_health", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.check_repository_health(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = repository_maint::check_health(&gb, inner.connectivity_only)?; tracing::info!(%repo, ok = resp.ok, errors = resp.errors.len(), warnings = resp.warnings.len(), "health check done"); Ok(tonic::Response::new(resp)) @@ -239,7 +341,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.garbage_collect", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.garbage_collect(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = repository_maint::run_gc(&gb, inner.prune, inner.aggressive)?; tracing::info!(%repo, ok = resp.ok, "gc done"); Ok(tonic::Response::new(resp)) @@ -253,7 +364,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.repack", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.repack(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = repository_maint::run_repack( &gb, inner.full, @@ -272,7 +392,16 @@ impl repository_service_server::RepositoryService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("repo.write_commit_graph", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + return client.write_commit_graph(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = repository_maint::run_commit_graph_write(&gb, inner.split, inner.replace)?; tracing::info!(%repo, ok = resp.ok, "commit-graph write done"); Ok(tonic::Response::new(resp)) diff --git a/server/tag.rs b/server/tag.rs index d2e3721..8a60fe3 100644 --- a/server/tag.rs +++ b/server/tag.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::tag_service_client::TagServiceClient; use super::{GitksService, into_status}; +async fn remote_tag_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding tag rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = TagServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl tag_service_server::TagService for GitksService { async fn list_tags( @@ -12,7 +32,16 @@ impl tag_service_server::TagService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("tag.list_tags", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + return client.list_tags(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.list_tags(inner).map_err(into_status)?; tracing::info!(%repo, count = resp.tags.len(), "list_tags done"); Ok(tonic::Response::new(resp)) @@ -27,7 +56,16 @@ impl tag_service_server::TagService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("tag.get_tag", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + return client.get_tag(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.get_tag(inner).map_err(into_status)?; Ok(tonic::Response::new(resp)) } @@ -41,7 +79,16 @@ impl tag_service_server::TagService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("tag.create_tag", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + return client.create_tag(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.create_tag(inner).map_err(into_status)?; tracing::info!(%repo, %name, "tag created"); Ok(tonic::Response::new(resp)) @@ -56,7 +103,16 @@ impl tag_service_server::TagService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("tag.delete_tag", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + return client.delete_tag(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; gb.delete_tag(inner).map_err(into_status)?; tracing::info!(%repo, %name, "tag deleted"); Ok(tonic::Response::new(())) @@ -71,7 +127,16 @@ impl tag_service_server::TagService for GitksService { let name = inner.name.clone(); let span = tracing::info_span!("tag.verify_tag", %repo, %name); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + return client.verify_tag(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = gb.verify_tag(inner).map_err(into_status)?; tracing::info!(%repo, %name, verified = resp.verified, "tag verified"); Ok(tonic::Response::new(resp)) diff --git a/server/tree.rs b/server/tree.rs index 5c4b95a..1f21300 100644 --- a/server/tree.rs +++ b/server/tree.rs @@ -1,7 +1,27 @@ use crate::pb::*; +use crate::pb::tree_service_client::TreeServiceClient; use super::{GitksService, cache, into_status, into_stream}; +async fn remote_tree_client( + svc: &GitksService, + header: Option<&RepositoryHeader>, +) -> Result>, tonic::Status> { + let header = match header { + Some(h) => h, + None => return Ok(None), + }; + let Some(route) = svc.route_repository(header).await? else { + return Ok(None); + }; + tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding tree rpc"); + let endpoint = super::remote_endpoint(&route.grpc_addr).await?; + let client = TreeServiceClient::connect(endpoint) + .await + .map_err(|e| tonic::Status::unavailable(e.to_string()))?; + Ok(Some(client)) +} + #[tonic::async_trait] impl tree_service_server::TreeService for GitksService { type GetRawBlobStream = @@ -15,7 +35,16 @@ impl tree_service_server::TreeService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("tree.list_tree", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + return client.list_tree(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("tree.list_tree", &inner, || { gb.list_tree(inner.clone()).map_err(into_status) @@ -35,7 +64,16 @@ impl tree_service_server::TreeService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("tree.get_tree", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + return client.get_tree(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("tree.get_tree", &inner, || { gb.get_tree(inner.clone()).map_err(into_status) @@ -55,7 +93,16 @@ impl tree_service_server::TreeService for GitksService { let path = inner.path.clone(); let span = tracing::info_span!("tree.get_blob", %repo, %path); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + return client.get_blob(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("tree.get_blob", &inner, || { gb.get_blob(inner.clone()).map_err(into_status) @@ -74,7 +121,18 @@ impl tree_service_server::TreeService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("tree.get_raw_blob", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + let resp = client.get_raw_blob(inner).await?; + let stream = super::bridge_server_stream(resp.into_inner()); + return Ok(tonic::Response::new(stream)); + } + return Err(err); + } + Err(err) => return Err(err), + }; let items = if inner.oid.is_some() { cache::cached_vec_response("tree.get_raw_blob", &inner, || { gb.get_raw_blob(inner.clone()).map_err(into_status) @@ -97,7 +155,16 @@ impl tree_service_server::TreeService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("tree.get_file_metadata", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + return client.get_file_metadata(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("tree.get_file_metadata", &inner, || { gb.get_file_metadata(inner.clone()).map_err(into_status) @@ -116,7 +183,16 @@ impl tree_service_server::TreeService for GitksService { let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("tree.find_files", %repo); let _enter = span.enter(); - let gb = self.resolve(inner.repository.as_ref())?; + let gb = match self.resolve(inner.repository.as_ref()) { + Ok(gb) => gb, + Err(err) if err.code() == tonic::Code::NotFound => { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + return client.find_files(inner).await; + } + return Err(err); + } + Err(err) => return Err(err), + }; let resp = if cache::selector_is_oid(&inner.revision) { cache::cached_response("tree.find_files", &inner, || { gb.find_files(inner.clone()).map_err(into_status) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 8fcff40..3fdd699 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,11 +1,8 @@ use gitks::bare::GitBare; use gitks::server::GitksService; -/// Create a GitksService with a temp directory as repo_prefix pub fn setup_service(dir: &std::path::Path) -> GitksService { - GitksService { - repo_prefix: dir.to_path_buf(), - } + GitksService::new(dir.to_path_buf()) } pub fn run_git(work_dir: &std::path::Path, args: &[&str]) -> duct::Expression {