From 7d16879d6300fe9a6f1d74450d65b4d542ae7485 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Sun, 4 Feb 2018 18:38:38 -0500 Subject: [PATCH 1/7] Add prometheus to ofborg --- nix/ofborg-carnix.nix | 127 +++++++++++++++++++++++++++++++++++++++++- ofborg/Cargo.lock | 39 +++++++++++++ ofborg/Cargo.toml | 1 + 3 files changed, 165 insertions(+), 2 deletions(-) diff --git a/nix/ofborg-carnix.nix b/nix/ofborg-carnix.nix index de062a2..a27a016 100644 --- a/nix/ofborg-carnix.nix +++ b/nix/ofborg-carnix.nix @@ -210,6 +210,14 @@ rec { sha256 = "1xxbzd8cjlpzsb9fsih7mdnndhzrvykj0w77yg90qc85az1xwy5z"; inherit dependencies buildDependencies features; }; + fnv_1_0_6_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { + crateName = "fnv"; + version = "1.0.6"; + authors = [ "Alex Crichton " ]; + sha256 = "128mlh23y3gg6ag5h8iiqlcbl59smisdzraqy88ldrf75kbw27ip"; + libPath = "lib.rs"; + inherit dependencies buildDependencies features; + }; foreign_types_0_3_2_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { crateName = "foreign-types"; version = "0.3.2"; @@ -444,6 +452,28 @@ rec { sha256 = "06k8fxgrsrxj8mjpjcq1n7mn2p1shpxif4zg9y5h09c7vy20s146"; inherit dependencies buildDependencies features; }; + prometheus_0_3_11_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { + crateName = "prometheus"; + version = "0.3.11"; + authors = [ "overvenus@gmail.com" "siddontang@gmail.com" ]; + sha256 = "1c5vcy771cwpd14adgknf7pmf603p8nf4q90ik7ry083vxrvbmbg"; + inherit dependencies buildDependencies features; + }; + protobuf_1_4_3_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { + crateName = "protobuf"; + version = "1.4.3"; + authors = [ "Stepan Koltsov " ]; + sha256 = "093fczpx523lm6d7xr5d4mqs88891ay6wk951yck3cavsz35z00b"; + crateBin = [ { name = "protoc-gen-rust"; path = "protoc-gen-rust.rs"; } { name = "protobuf-bin-gen-rust-do-not-use"; path = "protobuf-bin-gen-rust-do-not-use.rs"; } ]; + inherit dependencies buildDependencies features; + }; + quick_error_0_2_2_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { + crateName = "quick-error"; + version = "0.2.2"; + authors = [ "Paul Colomiets " "Colin Kiegel " ]; + sha256 = "0r1f4ps998y779qwvnmmxhjq00qh5wxg3m5inswfawg0vr2732db"; + inherit dependencies buildDependencies features; + }; quote_0_3_15_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { crateName = "quote"; version = "0.3.15"; @@ -559,6 +589,13 @@ rec { sha256 = "1pcclssyndz54cncsizkqrblmqqr1p2g6xhkpwldbk6qc95m4yw3"; inherit dependencies buildDependencies features; }; + spin_0_4_6_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { + crateName = "spin"; + version = "0.4.6"; + authors = [ "Mathijs van de Nes " "John Ericson " ]; + sha256 = "1mp30r3pxb38m6mszcgn6136d1r162fwcidg3y4d9rym21hmialj"; + inherit dependencies buildDependencies features; + }; syn_0_11_11_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { crateName = "syn"; version = "0.11.11"; @@ -1025,6 +1062,10 @@ rec { (f.error_chain_0_10_0.default or false) || (error_chain_0_10_0.default or false); }) [ backtrace_0_3_5_features ]; + fnv_1_0_6 = { features?(fnv_1_0_6_features {}) }: fnv_1_0_6_ {}; + fnv_1_0_6_features = f: updateFeatures f (rec { + fnv_1_0_6.default = (f.fnv_1_0_6.default or true); + }) []; foreign_types_0_3_2 = { features?(foreign_types_0_3_2_features {}) }: foreign_types_0_3_2_ { dependencies = mapFeatures features ([ foreign_types_shared_0_1_1 ]); }; @@ -1324,7 +1365,7 @@ rec { num_cpus_1_8_0.default = (f.num_cpus_1_8_0.default or true); }) [ libc_0_2_36_features ]; ofborg_0_1_1 = { features?(ofborg_0_1_1_features {}) }: ofborg_0_1_1_ { - dependencies = mapFeatures features ([ amqp_0_1_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]); + dependencies = mapFeatures features ([ amqp_0_1_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 prometheus_0_3_11 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]); }; ofborg_0_1_1_features = f: updateFeatures f (rec { amqp_0_1_0.default = true; @@ -1337,13 +1378,14 @@ rec { lru_cache_0_1_1.default = true; md5_0_3_6.default = true; ofborg_0_1_1.default = (f.ofborg_0_1_1.default or true); + prometheus_0_3_11.default = true; serde_1_0_27.default = true; serde_derive_1_0_27.default = true; serde_json_1_0_9.default = true; tempfile_2_2_0.default = true; uuid_0_4_0.default = true; uuid_0_4_0.v4 = true; - }) [ amqp_0_1_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ]; + }) [ amqp_0_1_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features prometheus_0_3_11_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ]; openssl_0_9_23 = { features?(openssl_0_9_23_features {}) }: openssl_0_9_23_ { dependencies = mapFeatures features ([ bitflags_0_9_1 foreign_types_0_3_2 lazy_static_1_0_0 libc_0_2_36 openssl_sys_0_9_24 ]); features = mkFeatures (features.openssl_0_9_23 or {}); @@ -1375,6 +1417,59 @@ rec { pkg_config_0_3_9_features = f: updateFeatures f (rec { pkg_config_0_3_9.default = (f.pkg_config_0_3_9.default or true); }) []; + prometheus_0_3_11 = { features?(prometheus_0_3_11_features {}) }: prometheus_0_3_11_ { + dependencies = mapFeatures features ([ cfg_if_0_1_2 fnv_1_0_6 lazy_static_0_2_11 protobuf_1_4_3 quick_error_0_2_2 spin_0_4_6 ]) + ++ (if kernel == "linux" then mapFeatures features ([]) else []); + features = mkFeatures (features.prometheus_0_3_11 or {}); + }; + prometheus_0_3_11_features = f: updateFeatures f (rec { + cfg_if_0_1_2.default = true; + fnv_1_0_6.default = true; + lazy_static_0_2_11.default = true; + prometheus_0_3_11.clippy = + (f.prometheus_0_3_11.clippy or false) || + (f.prometheus_0_3_11.dev or false) || + (prometheus_0_3_11.dev or false); + prometheus_0_3_11.default = (f.prometheus_0_3_11.default or true); + prometheus_0_3_11.hyper = + (f.prometheus_0_3_11.hyper or false) || + (f.prometheus_0_3_11.push or false) || + (prometheus_0_3_11.push or false); + prometheus_0_3_11.libc = + (f.prometheus_0_3_11.libc or false) || + (f.prometheus_0_3_11.nightly or false) || + (prometheus_0_3_11.nightly or false) || + (f.prometheus_0_3_11.process or false) || + (prometheus_0_3_11.process or false) || + (f.prometheus_0_3_11.push or false) || + (prometheus_0_3_11.push or false); + prometheus_0_3_11.procinfo = + (f.prometheus_0_3_11.procinfo or false) || + (f.prometheus_0_3_11.process or false) || + (prometheus_0_3_11.process or false); + protobuf_1_4_3.default = true; + quick_error_0_2_2.default = true; + spin_0_4_6.default = (f.spin_0_4_6.default or false); + spin_0_4_6.unstable = + (f.spin_0_4_6.unstable or false) || + (prometheus_0_3_11.nightly or false) || + (f.prometheus_0_3_11.nightly or false); + }) [ cfg_if_0_1_2_features fnv_1_0_6_features lazy_static_0_2_11_features protobuf_1_4_3_features quick_error_0_2_2_features spin_0_4_6_features ]; + protobuf_1_4_3 = { features?(protobuf_1_4_3_features {}) }: protobuf_1_4_3_ { + dependencies = mapFeatures features ([]); + features = mkFeatures (features.protobuf_1_4_3 or {}); + }; + protobuf_1_4_3_features = f: updateFeatures f (rec { + protobuf_1_4_3.bytes = + (f.protobuf_1_4_3.bytes or false) || + (f.protobuf_1_4_3.with-bytes or false) || + (protobuf_1_4_3.with-bytes or false); + protobuf_1_4_3.default = (f.protobuf_1_4_3.default or true); + }) []; + quick_error_0_2_2 = { features?(quick_error_0_2_2_features {}) }: quick_error_0_2_2_ {}; + quick_error_0_2_2_features = f: updateFeatures f (rec { + quick_error_0_2_2.default = (f.quick_error_0_2_2.default or true); + }) []; quote_0_3_15 = { features?(quote_0_3_15_features {}) }: quote_0_3_15_ {}; quote_0_3_15_features = f: updateFeatures f (rec { quote_0_3_15.default = (f.quote_0_3_15.default or true); @@ -1588,6 +1683,34 @@ rec { (f.serde_json_1_0_9.preserve_order or false) || (serde_json_1_0_9.preserve_order or false); }) [ dtoa_0_4_2_features itoa_0_3_4_features num_traits_0_1_41_features serde_1_0_27_features ]; + spin_0_4_6 = { features?(spin_0_4_6_features {}) }: spin_0_4_6_ { + features = mkFeatures (features.spin_0_4_6 or {}); + }; + spin_0_4_6_features = f: updateFeatures f (rec { + spin_0_4_6.asm = + (f.spin_0_4_6.asm or false) || + (f.spin_0_4_6.unstable or false) || + (spin_0_4_6.unstable or false); + spin_0_4_6.const_fn = + (f.spin_0_4_6.const_fn or false) || + (f.spin_0_4_6.once or false) || + (spin_0_4_6.once or false) || + (f.spin_0_4_6.unstable or false) || + (spin_0_4_6.unstable or false); + spin_0_4_6.core_intrinsics = + (f.spin_0_4_6.core_intrinsics or false) || + (f.spin_0_4_6.unstable or false) || + (spin_0_4_6.unstable or false); + spin_0_4_6.default = (f.spin_0_4_6.default or true); + spin_0_4_6.once = + (f.spin_0_4_6.once or false) || + (f.spin_0_4_6.unstable or false) || + (spin_0_4_6.unstable or false); + spin_0_4_6.unstable = + (f.spin_0_4_6.unstable or false) || + (f.spin_0_4_6.default or false) || + (spin_0_4_6.default or false); + }) []; syn_0_11_11 = { features?(syn_0_11_11_features {}) }: syn_0_11_11_ { dependencies = mapFeatures features ([ ] ++ (if features.syn_0_11_11.quote or false then [ quote_0_3_15 ] else []) diff --git a/ofborg/Cargo.lock b/ofborg/Cargo.lock index e766847..092422c 100644 --- a/ofborg/Cargo.lock +++ b/ofborg/Cargo.lock @@ -166,6 +166,11 @@ dependencies = [ "backtrace 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fnv" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "foreign-types" version = "0.3.2" @@ -385,6 +390,7 @@ dependencies = [ "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "md5 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -425,6 +431,29 @@ name = "pkg-config" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "prometheus" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "spin 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "protobuf" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "quick-error" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quote" version = "0.3.15" @@ -552,6 +581,11 @@ dependencies = [ "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "spin" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "syn" version = "0.11.11" @@ -765,6 +799,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" +"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" "checksum fs2 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" @@ -796,6 +831,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum openssl-sys 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "14ba54ac7d5a4eabd1d5f2c1fdeb7e7c14debfa669d94b983d01b465e767ba9e" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" +"checksum prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "71a963c5a59b4459a8133e60f8170df6fd29b67c0e6de5d45521d3056465bbfc" +"checksum protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bec26e67194b7d991908145fdf21b7cae8b08423d96dcb9e860cd31f854b9506" +"checksum quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ac990ab4e038dd8481a5e3fd00641067fcfc674ad663f3222752ed5284e05d4" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "512870020642bb8c221bf68baa1b2573da814f6ccfe5c9699b1c303047abe9b1" "checksum redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "0d92eecebad22b767915e4d529f89f28ee96dbbf5a4810d2b844373f136417fd" @@ -812,6 +850,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)" = "f4ba7591cfe93755e89eeecdbcc668885624829b020050e6aec99c2a03bd3fd0" "checksum serde_derive_internals 0.19.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6e03f1c9530c3fb0a0a5c9b826bdd9246a5921ae995d75f512ac917fc4dd55b5" "checksum serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c9db7266c7d63a4c4b7fe8719656ccdd51acf1bed6124b174f933b009fb10bcb" +"checksum spin 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7e4deb3c2455c73779e6d3eebceae9599fc70957e54c69fe88f93aa48e62f432" "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index 5e4aade..2966039 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -23,6 +23,7 @@ hubcaps = { git = "https://github.com/grahamc/hubcaps.git" } hyper = "0.10.*" hyper-native-tls = "0.2.4" lru-cache = "0.1.1" +prometheus = "0.3.11" #[patch.crates-io] From fd54190f00e8b6d19ba1edd4026fa2c99d45aa05 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 8 Feb 2018 10:04:40 -0500 Subject: [PATCH 2/7] Initial stats work --- ofborg/Cargo.lock | 4 +- ofborg/Cargo.toml | 3 +- ofborg/src/bin/mass-rebuilder.rs | 5 +- ofborg/src/bin/stats.rs | 83 ++++++++++ ofborg/src/lib.rs | 4 + ofborg/src/stats.rs | 35 ++++- ofborg/src/tasks/massrebuilder.rs | 13 +- ofborg/src/tasks/mod.rs | 1 + ofborg/src/tasks/statscollector.rs | 241 +++++++++++++++++++++++++++++ ofborg/src/worker.rs | 4 +- 10 files changed, 374 insertions(+), 19 deletions(-) create mode 100644 ofborg/src/bin/stats.rs create mode 100644 ofborg/src/tasks/statscollector.rs diff --git a/ofborg/Cargo.lock b/ofborg/Cargo.lock index 092422c..56a1a46 100644 --- a/ofborg/Cargo.lock +++ b/ofborg/Cargo.lock @@ -390,7 +390,7 @@ dependencies = [ "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "md5 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.3.11", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -434,7 +434,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "prometheus" version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -831,7 +830,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum openssl-sys 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "14ba54ac7d5a4eabd1d5f2c1fdeb7e7c14debfa669d94b983d01b465e767ba9e" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" -"checksum prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "71a963c5a59b4459a8133e60f8170df6fd29b67c0e6de5d45521d3056465bbfc" "checksum protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bec26e67194b7d991908145fdf21b7cae8b08423d96dcb9e860cd31f854b9506" "checksum quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ac990ab4e038dd8481a5e3fd00641067fcfc674ad663f3222752ed5284e05d4" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index 2966039..9fa8429 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -23,7 +23,8 @@ hubcaps = { git = "https://github.com/grahamc/hubcaps.git" } hyper = "0.10.*" hyper-native-tls = "0.2.4" lru-cache = "0.1.1" -prometheus = "0.3.11" +# prometheus = "0.3.11" +prometheus = { path = "../rust-prometheus/" } # for testing patches #[patch.crates-io] diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 8f0a5a5..d98d870 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -30,7 +30,10 @@ fn main() { let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let nix = cfg.nix(); - let events = stats::RabbitMQ::new(session.open_channel(3).unwrap()); + let events = stats::RabbitMQ::new( + &format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()), + session.open_channel(3).unwrap() + ); let mrw = tasks::massrebuilder::MassRebuildWorker::new( cloner, diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs new file mode 100644 index 0000000..257e819 --- /dev/null +++ b/ofborg/src/bin/stats.rs @@ -0,0 +1,83 @@ +extern crate hyper; +extern crate prometheus; +extern crate amqp; +extern crate ofborg; + +use std::env; +use ofborg::{easyamqp, tasks, worker, config, stats}; + +use amqp::Basic; +use ofborg::easyamqp::TypedWrappers; +use hyper::header::ContentType; +use hyper::mime::Mime; +use hyper::server::{Request, Response, Server}; +use prometheus::{Counter, Encoder, Gauge, HistogramVec, TextEncoder}; + +use std::thread; +use std::time::Duration; + +fn main() { + let cfg = config::load(env::args().nth(1).unwrap().as_ref()); + ofborg::setup_log(); + + println!("Hello, world!"); + + + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); + println!("Connected to rabbitmq"); + + let events = stats::RabbitMQ::new( + &format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()), + session.open_channel(3).unwrap() + ); + + let collector = tasks::statscollector::StatCollectorWorker::new( + events + ); + + let mut channel = session.open_channel(1).unwrap(); + + channel.basic_prefetch(1).unwrap(); + channel + .consume( + worker::new(collector), + easyamqp::ConsumeConfig { + queue: "sample-stats-events".to_owned(), + consumer_tag: format!("{}-prometheus-stats-collector", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None, + }, + ) + .unwrap(); + + + thread::spawn(||{ + let encoder = TextEncoder::new(); + let addr = "127.0.0.1:9898"; + println!("listening addr {:?}", addr); + Server::http(addr) + .unwrap() + .handle(move |_: Request, mut res: Response| { + let metric_familys = prometheus::gather(); + let mut buffer = vec![]; + encoder.encode(&metric_familys, &mut buffer).unwrap(); + res.headers_mut() + .set(ContentType(encoder.format_type().parse::().unwrap())); + res.send(&buffer).unwrap(); + }) + .unwrap(); + }); + + + channel.start_consuming(); + + println!("Finished consuming?"); + + channel.close(200, "Bye").unwrap(); + println!("Closed the channel"); + session.close(200, "Good Bye"); + println!("Closed the session... EOF"); +} diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index bda70db..12f3430 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -1,3 +1,7 @@ +#[macro_use] +extern crate prometheus; + + #[macro_use] extern crate serde_derive; extern crate serde; diff --git a/ofborg/src/stats.rs b/ofborg/src/stats.rs index 87c8dc0..3c779a4 100644 --- a/ofborg/src/stats.rs +++ b/ofborg/src/stats.rs @@ -1,23 +1,43 @@ +use serde_json; use amqp::Channel; use amqp::protocol::basic::BasicProperties; use amqp::Basic; -pub trait SysEvents { - fn tick(&mut self, name: &str); +pub trait SysEvents: Send { + fn notify(&mut self, event: Event); +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all="kebab-case")] +pub enum Event { + StatCollectorLegacyEvent, + StatCollectorBogusEvent, + JobReceived, + JobDecodeSuccess, + JobDecodeFailure, + IssueAlreadyClosed, + IssueFetchFailed, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct EventMessage { + pub sender: String, + pub events: Vec, } pub struct RabbitMQ { + identity: String, channel: Channel, } impl RabbitMQ { - pub fn new(channel: Channel) -> RabbitMQ { - RabbitMQ { channel: channel } + pub fn new(identity: &str, channel: Channel) -> RabbitMQ { + RabbitMQ { identity: identity.to_owned(), channel: channel } } } impl SysEvents for RabbitMQ { - fn tick(&mut self, name: &str) { + fn notify(&mut self, event: Event) { let props = BasicProperties { ..Default::default() }; self.channel .basic_publish( @@ -26,7 +46,10 @@ impl SysEvents for RabbitMQ { false, false, props, - String::from(name).into_bytes(), + serde_json::to_string(&EventMessage { + sender: self.identity.clone(), + events: vec![event], + }).unwrap().into_bytes(), ) .unwrap(); } diff --git a/ofborg/src/tasks/massrebuilder.rs b/ofborg/src/tasks/massrebuilder.rs index 155143b..0266bdf 100644 --- a/ofborg/src/tasks/massrebuilder.rs +++ b/ofborg/src/tasks/massrebuilder.rs @@ -14,6 +14,7 @@ use ofborg::nix::Nix; use ofborg::acl::ACL; use ofborg::stats; +use ofborg::stats::Event; use ofborg::worker; use ofborg::tagger::{StdenvTagger, RebuildTagger, PathsTagger, PkgsAddedRemovedTagger}; use ofborg::outpathdiff::{OutPaths, OutPathDiff}; @@ -73,7 +74,7 @@ impl MassRebuildWorker { } } -impl worker::SimpleWorker for MassRebuildWorker { +impl worker::SimpleWorker for MassRebuildWorker { type J = massrebuildjob::MassRebuildJob; fn msg_to_job( @@ -82,14 +83,14 @@ impl worker::SimpleWorker for MassRebuildWorker { _: &BasicProperties, body: &Vec, ) -> Result { - self.events.tick("job-received"); + self.events.notify(Event::JobReceived); return match massrebuildjob::from(body) { Ok(e) => { - self.events.tick("job-decode-success"); + self.events.notify(Event::JobDecodeSuccess); Ok(e) } Err(e) => { - self.events.tick("job-decode-failure"); + self.events.notify(Event::JobDecodeFailure); error!( "Failed to decode message: {:?}, Err: {:?}", String::from_utf8(body.clone()), @@ -113,7 +114,7 @@ impl worker::SimpleWorker for MassRebuildWorker { match issue.get() { Ok(iss) => { if iss.state == "closed" { - self.events.tick("issue-already-closed"); + self.events.notify(Event::IssueAlreadyClosed); info!("Skipping {} because it is closed", job.pr.number); return self.actions().skip(&job); } @@ -128,7 +129,7 @@ impl worker::SimpleWorker for MassRebuildWorker { } } Err(e) => { - self.events.tick("issue-fetch-failed"); + self.events.notify(Event::IssueFetchFailed); info!("Error fetching {}!", job.pr.number); info!("E: {:?}", e); return self.actions().skip(&job); diff --git a/ofborg/src/tasks/mod.rs b/ofborg/src/tasks/mod.rs index d735e60..663bab6 100644 --- a/ofborg/src/tasks/mod.rs +++ b/ofborg/src/tasks/mod.rs @@ -3,4 +3,5 @@ pub mod build; pub mod massrebuilder; pub mod githubcommentfilter; pub mod githubcommentposter; +pub mod statscollector; pub mod log_message_collector; diff --git a/ofborg/src/tasks/statscollector.rs b/ofborg/src/tasks/statscollector.rs new file mode 100644 index 0000000..94b988c --- /dev/null +++ b/ofborg/src/tasks/statscollector.rs @@ -0,0 +1,241 @@ +extern crate prometheus; +extern crate amqp; +extern crate env_logger; + +use serde_json; +use std::str::FromStr; +use ofborg::worker; +use ofborg::stats; +use amqp::protocol::basic::{Deliver, BasicProperties}; +use std::collections::HashMap; +use std::mem; +use std::thread; +use std::time::Duration; +use std::sync::Arc; +use std::sync::Mutex; + +pub struct StatCollectorWorker { + events: E, + counter_collectors: HashMap, +} + +impl StatCollectorWorker { + pub fn new(events: E) -> StatCollectorWorker { + let mut worker = StatCollectorWorker { + events: events, + counter_collectors: HashMap::new(), + }; + + let initial_events: Vec = vec![ + stats::Event::StatCollectorLegacyEvent, + stats::Event::StatCollectorBogusEvent, + stats::Event::JobReceived, + stats::Event::JobDecodeSuccess, + stats::Event::JobDecodeFailure, + stats::Event::IssueAlreadyClosed, + stats::Event::IssueFetchFailed, + ]; + for initial_event in initial_events { + match initial_event { + // WARNING + // BEFORE YOU ADD A NEW VARIANT HERE, ADD IT + // TO THE LIST ABOVE! + // + // EACH VARIANT MUST BE INITIALIZED PRIOR + // TO REPORTING STATS + stats::Event::StatCollectorLegacyEvent => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "stats_collector".to_owned(), + name: "legacy_event".to_owned(), + help: "Number of received legacy events".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::StatCollectorBogusEvent => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "stats_collector".to_owned(), + name: "bogus_event".to_owned(), + help: "Number of received unparseable events".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::JobReceived => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "generic_worker".to_owned(), + name: "job_received".to_owned(), + help: "Number of received worker jobs".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::JobDecodeSuccess => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "generic_worker".to_owned(), + name: "job_decode_successful".to_owned(), + help: "Number of successfully decoded jobs".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::JobDecodeFailure => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "generic_worker".to_owned(), + name: "job_decode_failure".to_owned(), + help: "Number of jobs which failed to parse".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::IssueAlreadyClosed => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "github".to_owned(), + name: "issue_closed".to_owned(), + help: "Number of jobs for issues which are already closed".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + + ); + }, + stats::Event::IssueFetchFailed => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "github".to_owned(), + name: "issue_fetch_fail".to_owned(), + help: "Number of failed fetches for GitHub issues".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + }; + } + + return worker; + } + + pub fn counter(&self, event: &stats::Event) -> prometheus::CounterVec { + let disc = format!("{:?}", mem::discriminant(event)); + self.counter_collectors.get(&disc).unwrap().clone() + } + + pub fn register_counter( + &mut self, + event: &stats::Event, + opts: prometheus::Opts, + ) { + let disc = format!("{:?}", mem::discriminant(event)); + let orig_labels = opts.variable_labels.clone(); + let labels: Vec<&str> = orig_labels + .iter() + .map(|v| v.as_ref()) + .collect(); + + let counter = register_counter_vec!( + opts, labels.as_ref() + ).unwrap(); + counter.with_label_values(&[""]).inc_by(0.0); + + self.counter_collectors.insert( + disc, + counter + ); + } +} + +impl worker::SimpleWorker for StatCollectorWorker { + type J = stats::EventMessage; + + fn msg_to_job( + &mut self, + _: &Deliver, + _: &BasicProperties, + body: &Vec, + ) -> Result { + return match serde_json::from_slice(body) { + Ok(e) => Ok(e), + Err(_) => { + let mut modified_body: Vec = vec!["\"".as_bytes()[0]]; + modified_body.append(&mut body.clone()); + modified_body.push("\"".as_bytes()[0]); + + match serde_json::from_slice(&modified_body) { + Ok(e) => { + self.events.notify(stats::Event::StatCollectorLegacyEvent); + Ok(stats::EventMessage { + sender: "".to_owned(), + events: vec![e], + }) + }, + Err(e) => { + self.events.notify(stats::Event::StatCollectorBogusEvent); + error!( + "Failed to decode message: {:?}, Err: {:?}", + String::from_utf8(body.clone()), + e + ); + Err("Failed to decode message".to_owned()) + } + } + } + }; + } + + fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions { + let sender = job.sender.clone(); + for event in job.events.iter() { + match *event { + stats::Event::StatCollectorLegacyEvent => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::StatCollectorBogusEvent => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::JobReceived => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::JobDecodeSuccess => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::JobDecodeFailure => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::IssueAlreadyClosed => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::IssueFetchFailed => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + } + } + + return vec![worker::Action::Ack]; + } +} diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index 0b18e2c..62922ae 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -54,8 +54,8 @@ where }); } -pub trait SimpleWorker { - type J; +pub trait SimpleWorker: Send + 'static { + type J: Send; fn consumer(&mut self, job: &Self::J) -> Actions; From d749fe5f4dc1f1af2c3d4935abd940a918939bcc Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Mon, 12 Feb 2018 08:20:43 -0500 Subject: [PATCH 3/7] Implement a nicer stats API but with gross code gen --- nix/ofborg-carnix.nix | 130 +------ ofborg/Cargo.lock | 37 -- ofborg/Cargo.toml | 7 +- ofborg/build.rs | 597 +++++++++++++++++++++++++++++ ofborg/src/bin/stats.rs | 19 +- ofborg/src/lib.rs | 4 - ofborg/src/stats.rs | 19 +- ofborg/src/tasks/statscollector.rs | 185 +-------- 8 files changed, 624 insertions(+), 374 deletions(-) create mode 100644 ofborg/build.rs diff --git a/nix/ofborg-carnix.nix b/nix/ofborg-carnix.nix index a27a016..ae804a3 100644 --- a/nix/ofborg-carnix.nix +++ b/nix/ofborg-carnix.nix @@ -210,14 +210,6 @@ rec { sha256 = "1xxbzd8cjlpzsb9fsih7mdnndhzrvykj0w77yg90qc85az1xwy5z"; inherit dependencies buildDependencies features; }; - fnv_1_0_6_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { - crateName = "fnv"; - version = "1.0.6"; - authors = [ "Alex Crichton " ]; - sha256 = "128mlh23y3gg6ag5h8iiqlcbl59smisdzraqy88ldrf75kbw27ip"; - libPath = "lib.rs"; - inherit dependencies buildDependencies features; - }; foreign_types_0_3_2_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { crateName = "foreign-types"; version = "0.3.2"; @@ -418,7 +410,8 @@ rec { crateName = "ofborg"; version = "0.1.1"; authors = [ "Graham Christensen " ]; - src = include [ "Cargo.toml" "Cargo.lock" "src" "test-srcs" ] ./../ofborg; + src = include [ "Cargo.toml" "Cargo.lock" "src" "test-srcs" "build.rs" ] ./../ofborg; + build = "build.rs"; inherit dependencies buildDependencies features; }; openssl_0_9_23_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { @@ -452,28 +445,6 @@ rec { sha256 = "06k8fxgrsrxj8mjpjcq1n7mn2p1shpxif4zg9y5h09c7vy20s146"; inherit dependencies buildDependencies features; }; - prometheus_0_3_11_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { - crateName = "prometheus"; - version = "0.3.11"; - authors = [ "overvenus@gmail.com" "siddontang@gmail.com" ]; - sha256 = "1c5vcy771cwpd14adgknf7pmf603p8nf4q90ik7ry083vxrvbmbg"; - inherit dependencies buildDependencies features; - }; - protobuf_1_4_3_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { - crateName = "protobuf"; - version = "1.4.3"; - authors = [ "Stepan Koltsov " ]; - sha256 = "093fczpx523lm6d7xr5d4mqs88891ay6wk951yck3cavsz35z00b"; - crateBin = [ { name = "protoc-gen-rust"; path = "protoc-gen-rust.rs"; } { name = "protobuf-bin-gen-rust-do-not-use"; path = "protobuf-bin-gen-rust-do-not-use.rs"; } ]; - inherit dependencies buildDependencies features; - }; - quick_error_0_2_2_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { - crateName = "quick-error"; - version = "0.2.2"; - authors = [ "Paul Colomiets " "Colin Kiegel " ]; - sha256 = "0r1f4ps998y779qwvnmmxhjq00qh5wxg3m5inswfawg0vr2732db"; - inherit dependencies buildDependencies features; - }; quote_0_3_15_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { crateName = "quote"; version = "0.3.15"; @@ -589,13 +560,6 @@ rec { sha256 = "1pcclssyndz54cncsizkqrblmqqr1p2g6xhkpwldbk6qc95m4yw3"; inherit dependencies buildDependencies features; }; - spin_0_4_6_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { - crateName = "spin"; - version = "0.4.6"; - authors = [ "Mathijs van de Nes " "John Ericson " ]; - sha256 = "1mp30r3pxb38m6mszcgn6136d1r162fwcidg3y4d9rym21hmialj"; - inherit dependencies buildDependencies features; - }; syn_0_11_11_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { crateName = "syn"; version = "0.11.11"; @@ -1062,10 +1026,6 @@ rec { (f.error_chain_0_10_0.default or false) || (error_chain_0_10_0.default or false); }) [ backtrace_0_3_5_features ]; - fnv_1_0_6 = { features?(fnv_1_0_6_features {}) }: fnv_1_0_6_ {}; - fnv_1_0_6_features = f: updateFeatures f (rec { - fnv_1_0_6.default = (f.fnv_1_0_6.default or true); - }) []; foreign_types_0_3_2 = { features?(foreign_types_0_3_2_features {}) }: foreign_types_0_3_2_ { dependencies = mapFeatures features ([ foreign_types_shared_0_1_1 ]); }; @@ -1365,7 +1325,7 @@ rec { num_cpus_1_8_0.default = (f.num_cpus_1_8_0.default or true); }) [ libc_0_2_36_features ]; ofborg_0_1_1 = { features?(ofborg_0_1_1_features {}) }: ofborg_0_1_1_ { - dependencies = mapFeatures features ([ amqp_0_1_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 prometheus_0_3_11 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]); + dependencies = mapFeatures features ([ amqp_0_1_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]); }; ofborg_0_1_1_features = f: updateFeatures f (rec { amqp_0_1_0.default = true; @@ -1378,14 +1338,13 @@ rec { lru_cache_0_1_1.default = true; md5_0_3_6.default = true; ofborg_0_1_1.default = (f.ofborg_0_1_1.default or true); - prometheus_0_3_11.default = true; serde_1_0_27.default = true; serde_derive_1_0_27.default = true; serde_json_1_0_9.default = true; tempfile_2_2_0.default = true; uuid_0_4_0.default = true; uuid_0_4_0.v4 = true; - }) [ amqp_0_1_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features prometheus_0_3_11_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ]; + }) [ amqp_0_1_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ]; openssl_0_9_23 = { features?(openssl_0_9_23_features {}) }: openssl_0_9_23_ { dependencies = mapFeatures features ([ bitflags_0_9_1 foreign_types_0_3_2 lazy_static_1_0_0 libc_0_2_36 openssl_sys_0_9_24 ]); features = mkFeatures (features.openssl_0_9_23 or {}); @@ -1417,59 +1376,6 @@ rec { pkg_config_0_3_9_features = f: updateFeatures f (rec { pkg_config_0_3_9.default = (f.pkg_config_0_3_9.default or true); }) []; - prometheus_0_3_11 = { features?(prometheus_0_3_11_features {}) }: prometheus_0_3_11_ { - dependencies = mapFeatures features ([ cfg_if_0_1_2 fnv_1_0_6 lazy_static_0_2_11 protobuf_1_4_3 quick_error_0_2_2 spin_0_4_6 ]) - ++ (if kernel == "linux" then mapFeatures features ([]) else []); - features = mkFeatures (features.prometheus_0_3_11 or {}); - }; - prometheus_0_3_11_features = f: updateFeatures f (rec { - cfg_if_0_1_2.default = true; - fnv_1_0_6.default = true; - lazy_static_0_2_11.default = true; - prometheus_0_3_11.clippy = - (f.prometheus_0_3_11.clippy or false) || - (f.prometheus_0_3_11.dev or false) || - (prometheus_0_3_11.dev or false); - prometheus_0_3_11.default = (f.prometheus_0_3_11.default or true); - prometheus_0_3_11.hyper = - (f.prometheus_0_3_11.hyper or false) || - (f.prometheus_0_3_11.push or false) || - (prometheus_0_3_11.push or false); - prometheus_0_3_11.libc = - (f.prometheus_0_3_11.libc or false) || - (f.prometheus_0_3_11.nightly or false) || - (prometheus_0_3_11.nightly or false) || - (f.prometheus_0_3_11.process or false) || - (prometheus_0_3_11.process or false) || - (f.prometheus_0_3_11.push or false) || - (prometheus_0_3_11.push or false); - prometheus_0_3_11.procinfo = - (f.prometheus_0_3_11.procinfo or false) || - (f.prometheus_0_3_11.process or false) || - (prometheus_0_3_11.process or false); - protobuf_1_4_3.default = true; - quick_error_0_2_2.default = true; - spin_0_4_6.default = (f.spin_0_4_6.default or false); - spin_0_4_6.unstable = - (f.spin_0_4_6.unstable or false) || - (prometheus_0_3_11.nightly or false) || - (f.prometheus_0_3_11.nightly or false); - }) [ cfg_if_0_1_2_features fnv_1_0_6_features lazy_static_0_2_11_features protobuf_1_4_3_features quick_error_0_2_2_features spin_0_4_6_features ]; - protobuf_1_4_3 = { features?(protobuf_1_4_3_features {}) }: protobuf_1_4_3_ { - dependencies = mapFeatures features ([]); - features = mkFeatures (features.protobuf_1_4_3 or {}); - }; - protobuf_1_4_3_features = f: updateFeatures f (rec { - protobuf_1_4_3.bytes = - (f.protobuf_1_4_3.bytes or false) || - (f.protobuf_1_4_3.with-bytes or false) || - (protobuf_1_4_3.with-bytes or false); - protobuf_1_4_3.default = (f.protobuf_1_4_3.default or true); - }) []; - quick_error_0_2_2 = { features?(quick_error_0_2_2_features {}) }: quick_error_0_2_2_ {}; - quick_error_0_2_2_features = f: updateFeatures f (rec { - quick_error_0_2_2.default = (f.quick_error_0_2_2.default or true); - }) []; quote_0_3_15 = { features?(quote_0_3_15_features {}) }: quote_0_3_15_ {}; quote_0_3_15_features = f: updateFeatures f (rec { quote_0_3_15.default = (f.quote_0_3_15.default or true); @@ -1683,34 +1589,6 @@ rec { (f.serde_json_1_0_9.preserve_order or false) || (serde_json_1_0_9.preserve_order or false); }) [ dtoa_0_4_2_features itoa_0_3_4_features num_traits_0_1_41_features serde_1_0_27_features ]; - spin_0_4_6 = { features?(spin_0_4_6_features {}) }: spin_0_4_6_ { - features = mkFeatures (features.spin_0_4_6 or {}); - }; - spin_0_4_6_features = f: updateFeatures f (rec { - spin_0_4_6.asm = - (f.spin_0_4_6.asm or false) || - (f.spin_0_4_6.unstable or false) || - (spin_0_4_6.unstable or false); - spin_0_4_6.const_fn = - (f.spin_0_4_6.const_fn or false) || - (f.spin_0_4_6.once or false) || - (spin_0_4_6.once or false) || - (f.spin_0_4_6.unstable or false) || - (spin_0_4_6.unstable or false); - spin_0_4_6.core_intrinsics = - (f.spin_0_4_6.core_intrinsics or false) || - (f.spin_0_4_6.unstable or false) || - (spin_0_4_6.unstable or false); - spin_0_4_6.default = (f.spin_0_4_6.default or true); - spin_0_4_6.once = - (f.spin_0_4_6.once or false) || - (f.spin_0_4_6.unstable or false) || - (spin_0_4_6.unstable or false); - spin_0_4_6.unstable = - (f.spin_0_4_6.unstable or false) || - (f.spin_0_4_6.default or false) || - (spin_0_4_6.default or false); - }) []; syn_0_11_11 = { features?(syn_0_11_11_features {}) }: syn_0_11_11_ { dependencies = mapFeatures features ([ ] ++ (if features.syn_0_11_11.quote or false then [ quote_0_3_15 ] else []) diff --git a/ofborg/Cargo.lock b/ofborg/Cargo.lock index 56a1a46..e766847 100644 --- a/ofborg/Cargo.lock +++ b/ofborg/Cargo.lock @@ -166,11 +166,6 @@ dependencies = [ "backtrace 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "fnv" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "foreign-types" version = "0.3.2" @@ -390,7 +385,6 @@ dependencies = [ "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "md5 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "prometheus 0.3.11", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -431,28 +425,6 @@ name = "pkg-config" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "prometheus" -version = "0.3.11" -dependencies = [ - "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", - "protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", - "quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "spin 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", -] - -[[package]] -name = "protobuf" -version = "1.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] -name = "quick-error" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "quote" version = "0.3.15" @@ -580,11 +552,6 @@ dependencies = [ "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "spin" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "syn" version = "0.11.11" @@ -798,7 +765,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" "checksum error-chain 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9435d864e017c3c6afeac1654189b06cdb491cf2ff73dbf0d73b0f292f42ff8" -"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" "checksum foreign-types-shared 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" "checksum fs2 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" @@ -830,8 +796,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum openssl-sys 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "14ba54ac7d5a4eabd1d5f2c1fdeb7e7c14debfa669d94b983d01b465e767ba9e" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" -"checksum protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bec26e67194b7d991908145fdf21b7cae8b08423d96dcb9e860cd31f854b9506" -"checksum quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ac990ab4e038dd8481a5e3fd00641067fcfc674ad663f3222752ed5284e05d4" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "512870020642bb8c221bf68baa1b2573da814f6ccfe5c9699b1c303047abe9b1" "checksum redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "0d92eecebad22b767915e4d529f89f28ee96dbbf5a4810d2b844373f136417fd" @@ -848,7 +812,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)" = "f4ba7591cfe93755e89eeecdbcc668885624829b020050e6aec99c2a03bd3fd0" "checksum serde_derive_internals 0.19.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6e03f1c9530c3fb0a0a5c9b826bdd9246a5921ae995d75f512ac917fc4dd55b5" "checksum serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c9db7266c7d63a4c4b7fe8719656ccdd51acf1bed6124b174f933b009fb10bcb" -"checksum spin 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7e4deb3c2455c73779e6d3eebceae9599fc70957e54c69fe88f93aa48e62f432" "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index 9fa8429..9607cc4 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -2,8 +2,8 @@ name = "ofborg" version = "0.1.1" authors = ["Graham Christensen "] -include = ["Cargo.toml", "Cargo.lock", "src", "test-srcs"] - +include = ["Cargo.toml", "Cargo.lock", "src", "test-srcs", "build.rs"] +build = "build.rs" [dependencies] @@ -23,9 +23,6 @@ hubcaps = { git = "https://github.com/grahamc/hubcaps.git" } hyper = "0.10.*" hyper-native-tls = "0.2.4" lru-cache = "0.1.1" -# prometheus = "0.3.11" -prometheus = { path = "../rust-prometheus/" } # for testing patches - #[patch.crates-io] #amq-proto = { path = "rust-amq-proto" } \ No newline at end of file diff --git a/ofborg/build.rs b/ofborg/build.rs new file mode 100644 index 0000000..e458dcf --- /dev/null +++ b/ofborg/build.rs @@ -0,0 +1,597 @@ + +use std::env; +use std::fs::File; +use std::io::Write; +use std::path::Path; + +enum MetricType { + Ticker(Metric), + Counter(Metric), +} + +impl MetricType { + fn collector_type(&self) -> String { + match self { + &MetricType::Ticker(_) => { + String::from("u64") + } + &MetricType::Counter(_) => { + String::from("u64") + } + } + } + + fn enum_matcher_types(&self) -> String { + let fields = self.enum_field_types(); + + if fields.len() > 0 { + format!("{}({})", self.variant(), fields.join(", ")) + } else { + format!("{}", self.variant()) + } + } + + fn variant(&self) -> String { + match self { + &MetricType::Ticker(ref event) => { + event.variant.clone() + } + &MetricType::Counter(ref event) => { + event.variant.clone() + } + } + } + + fn metric_type(&self) -> String { + match self { + &MetricType::Ticker(_) => { + String::from("counter") + } + &MetricType::Counter(_) => { + String::from("counter") + } + } + } + + fn metric_name(&self) -> String { + match self { + &MetricType::Ticker(ref event) => { + event.metric_name.clone() + } + &MetricType::Counter(ref event) => { + event.metric_name.clone() + } + } + } + + fn description(&self) -> String { + match self { + &MetricType::Ticker(ref event) => { + event.description.clone() + } + &MetricType::Counter(ref event) => { + event.description.clone() + } + } + } + + fn enum_index_types(&self) -> Vec { + let event: &Metric; + + match self { + &MetricType::Ticker(ref i_event) => { + event = i_event; + } + &MetricType::Counter(ref i_event) => { + event = i_event; + } + } + + let fields: Vec = event.fields + .iter() + .map(|&(ref _fieldname, ref fieldtype)| fieldtype.clone()) + .collect(); + + return fields + } + + fn enum_field_types(&self) -> Vec { + let mut extra_fields: Vec = vec![]; + + match self { + &MetricType::Ticker(_) => {} + &MetricType::Counter(_) => { + extra_fields = vec![self.collector_type()]; + } + } + + let mut fields: Vec = self.enum_index_types(); + fields.append(&mut extra_fields); + + return fields + } + + fn enum_index_names(&self) -> Vec { + let event: &Metric; + + match self { + &MetricType::Ticker(ref i_event) => { + event = i_event; + } + &MetricType::Counter(ref i_event) => { + event = i_event; + } + } + + let fields: Vec = event.fields + .iter() + .map(|&(ref fieldname, ref _fieldtype)| fieldname.clone()) + .collect(); + + return fields + } + + fn enum_field_names(&self) -> Vec { + let mut extra_fields: Vec = vec![]; + + match self { + &MetricType::Ticker(_) => {} + &MetricType::Counter(_) => { + extra_fields = vec!["value".to_owned()]; + } + } + + let mut fields: Vec = self.enum_index_names(); + fields.append(&mut extra_fields); + + return fields + } + + fn record_value(&self) -> String { + match self { + &MetricType::Ticker(_) => { + String::from("1") + } + &MetricType::Counter(_) => { + String::from("value") + } + } + } +} + +struct Metric { + variant: String, + fields: Vec<(String,String)>, // Vec because it is sorted + metric_name: String, + description: String, +} + + +fn name_to_parts(name: &str) -> Vec { + let mut parts: Vec = vec![]; + let mut buf = String::from(""); + for c in name.chars() { + if char::is_uppercase(c) && buf.len() > 0 { + parts.push(buf.to_owned()); + buf = String::from(""); + } + buf.push_str(&c.to_string()); + } + if buf.len() > 0 { + parts.push(buf.to_owned()); + std::mem::drop(buf); + } + + + return parts; +} + +impl Metric { + pub fn ticker(name: &str, desc: &str, fields: Option>) -> MetricType { + let parts = name_to_parts(name); + + MetricType::Ticker(Metric { + variant: parts + .iter() + .map(|f| f.clone().to_owned()) + .collect(), + fields: fields + .unwrap_or(vec![]) + .iter() + .map(|&(ref fieldname, ref fieldtype)| (fieldname.clone().to_owned(), fieldtype.clone().to_owned())) + .collect(), + metric_name: parts.join("_").to_lowercase(), + description: desc.to_owned(), + }) + } + + pub fn counter(name: &str, desc: &str, fields: Option>) -> MetricType { + let parts = name_to_parts(name); + + MetricType::Counter(Metric { + variant: parts + .iter() + .map(|f| f.clone().to_owned()) + .collect(), + fields: fields + .unwrap_or(vec![]) + .iter() + .map(|&(ref fieldname, ref fieldtype)| (fieldname.clone().to_owned(), fieldtype.clone().to_owned())) + .collect(), + metric_name: parts.join("_").to_lowercase(), + description: desc.to_owned(), + }) + } +} + +fn events() -> Vec { + return vec![ + Metric::ticker( + "StatCollectorLegacyEvent", + "Number of received legacy events", + Some(vec![("event", "String")]), + ), + Metric::ticker( + "StatCollectorBogusEvent", + "Number of received unparseable events", + None, + ), + Metric::ticker( + "JobReceived", + "Number of received worker jobs", + None, + ), + Metric::counter( + "EvaluationDuration", + "Amount of time spent running evaluations", + None + ), + Metric::ticker( + "JobDecodeSuccess", + "Number of successfully decoded jobs", + None, + ), + Metric::ticker( + "JobDecodeFailure", + "Number of jobs which failed to parse", + None, + ), + Metric::ticker( + "IssueAlreadyClosed", + "Number of jobs for issues which are already closed", + None, + ), + Metric::ticker( + "IssueFetchFailed", + "Number of failed fetches for GitHub issues", + None, + ), + /* + Metric::counter( + "TimeElapsed", + "", + None + ), + Metric::counter( + "EnvironmentsAllocatedCount", + "", + None + ), + Metric::counter( + "EnvironmentsAllocatedBytes", + "", + None + ), + Metric::counter( + "ListElementsCount", + "", + None + ), + Metric::counter( + "ListElementsBytes", + "", + None + ), + Metric::counter( + "ListConcatenations", + "", + None + ), + Metric::counter( + "ValuesAllocatedCount", + "", + None + ), + Metric::counter( + "ValuesAllocatedBytes", + "", + None + ), + Metric::counter( + "SetsAllocatedCount", + "", + None + ), + Metric::counter( + "SetsAllocatedBytes", + "", + None + ), + Metric::counter( + "RightBiasedUnions", + "", + None + ), + Metric::counter( + "ValuesCopiedInRightBiasedUnions", + "", + None + ), + Metric::counter( + "SymbolsInSymbolTable", + "", + None + ), + Metric::counter( + "SizeOfSymbolTable", + "", + None + ), + Metric::counter( + "NumberOfThunks", + "", + None + ), + Metric::counter( + "NumberOfThunksAvoided", + "", + None + ), + Metric::counter( + "NumberOfAttrLookups", + "", + None + ), + Metric::counter( + "NumberOfPrimopCalls", + "", + None + ), + Metric::counter( + "NumberOfFunctionCalls", + "", + None + ), + Metric::counter( + "TotalAllocations", + "", + None + ), + Metric::counter( + "CurrentBoehmHeapSizeBytes", + "", + None + ), + Metric::counter( + "TotalBoehmHeapAllocationsBytes", + "", + None + ), + */ + ]; +} + +fn main() { + let out_dir = env::var("OUT_DIR").unwrap(); + let dest_path = Path::new(&out_dir).join("events.rs"); + let mut f = File::create(&dest_path).unwrap(); + + println!("cargo:rerun-if-changed=build.rs"); + + // Write the Event enum, which contains all possible event types + f.write_all(b" +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::Mutex; +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all=\"kebab-case\")] +pub enum Event { +").unwrap(); + + let variants: Vec = events() + .iter() + .map(|mtype| format!(" {}", mtype.enum_matcher_types()) ) + .collect(); + + + f.write_all(variants.join(",\n").as_bytes()).unwrap(); + f.write_all("\n}\n\n".as_bytes()).unwrap(); + + f.write_all(b"pub fn event_metric_name(event: &Event) -> String { + match event { +").unwrap(); + + let variants: Vec = events() + .iter() + .map(|mtype| { + let fields: Vec = mtype.enum_field_names() + .iter() + .map(|_| String::from("_")) + .collect(); + + let variant_match: String; + if fields.len() > 0 { + variant_match = format!( + "{}({})", + &mtype.variant(), + fields + .join(", ")); + } else { + variant_match = format!("{}", &mtype.variant()); + } + + + format!(" &Event::{} => String::from(\"{}\")", + &variant_match, + &mtype.metric_name(), + ) + }).collect(); + + + f.write_all(variants.join(",\n").as_bytes()).unwrap(); + f.write_all("}\n }".as_bytes()).unwrap(); + + // Create a struct to hold all the possible metrics + f.write_all(b" +#[derive(Debug, Clone)] +pub struct MetricCollector { +").unwrap(); + + let variants: Vec = events() + .iter() + .map(|mtype| { + let mut fields: Vec = mtype.enum_index_types(); + fields.push("String".to_owned()); // Instance + + format!(" {}: Arc>>", + mtype.metric_name(), + fields.join(", "), + mtype.collector_type(), + ) + }).collect(); + + + f.write_all(variants.join(",\n").as_bytes()).unwrap(); + f.write_all("\n}\n\n".as_bytes()).unwrap(); + + // Create a struct to hold all the possible metrics + f.write_all(b" +impl MetricCollector { + pub fn new() -> MetricCollector { + MetricCollector { +").unwrap(); + + let variants: Vec = events() + .iter() + .map(|mtype| { + let mut fields: Vec = mtype.enum_field_types(); + fields.push("String".to_owned()); // Instance + + format!(" {}: Arc::new(Mutex::new(HashMap::new()))", + &mtype.metric_name(), + ) + }).collect(); + + + f.write_all(variants.join(",\n").as_bytes()).unwrap(); + f.write_all("\n }\n".as_bytes()).unwrap(); + f.write_all("\n }\n".as_bytes()).unwrap(); + + f.write_all(b" + pub fn record(&self, instance: String, event: Event) { + match event { +").unwrap(); + + let variants: Vec = events() + .iter() + .map(|mtype| { + let fields: Vec = mtype.enum_field_names(); + + let variant_match: String; + if fields.len() > 0 { + variant_match = format!("{}({})", &mtype.variant(), fields.join(", ")); + } else { + variant_match = format!("{}", &mtype.variant()); + } + + let mut index_fields: Vec = mtype.enum_index_names(); + index_fields.push("instance".to_owned()); + + + format!(" + Event::{} => {{ + let mut accum_table = self.{} + .lock() + .expect(\"Failed to unwrap metric mutex for {}\"); + let accum = accum_table + .entry(({})) + .or_insert(0); + *accum += {}; + }} + ", + variant_match, + &mtype.metric_name(), + &mtype.metric_name(), + index_fields.join(", "), + &mtype.record_value(), + ) + }).collect(); + + + f.write_all(variants.join(",\n").as_bytes()).unwrap(); + f.write_all("\n }\n".as_bytes()).unwrap(); + f.write_all("\n }\n".as_bytes()).unwrap(); + + + f.write_all(b"pub fn prometheus_output(&self) -> String { + let mut output = String::new(); +").unwrap(); + + let variants: Vec = events() + .iter() + .map(|mtype| { + let mut index_fields: Vec = mtype.enum_index_names(); + index_fields.push("instance".to_owned()); + let ref_index_fields: Vec = index_fields + .iter() + .map(|m| format!("ref {}", m)) + .collect(); + + let for_matcher: String; + if index_fields.len() > 1 { + for_matcher = format!("({})", + ref_index_fields.join(", ")); + } else { + for_matcher = ref_index_fields.join(", "); + } + + let key_value_pairs: Vec = index_fields + .iter() + .map(|name| format!(" format!(\"{}=\\\"{{}}\\\"\", {})", &name, &name)) + .collect(); + format!(" + output.push_str(\"# HELP ofborg_{} {}\n\"); + output.push_str(\"# TYPE ofborg_{} {}\n\"); + + let table = self.{}.lock() + .expect(\"Failed to unwrap metric mutex for {}\"); + let values: Vec = (*table) + .iter() + .map(|(&{}, value)| {{ + let kvs: Vec = vec![ +{} + ]; + format!(\"ofborg_{}{{{{{{}}}}}} {{}}\", kvs.join(\",\"), value) + }}) + .collect(); + output.push_str(&values.join(\"\n\")); + output.push_str(\"\n\"); + ", + &mtype.metric_name(), + &mtype.description(), + &mtype.metric_name(), + &mtype.metric_type(), + &mtype.metric_name(), + &mtype.metric_name(), + for_matcher, + &key_value_pairs.join(",\n"), + &mtype.metric_name(), + ) + }).collect(); + + + f.write_all(variants.join("\n").as_bytes()).unwrap(); + f.write_all("return output;\n }".as_bytes()).unwrap(); + f.write_all("\n}".as_bytes()).unwrap(); + +} diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 257e819..9d7ce78 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -1,5 +1,4 @@ extern crate hyper; -extern crate prometheus; extern crate amqp; extern crate ofborg; @@ -8,10 +7,7 @@ use ofborg::{easyamqp, tasks, worker, config, stats}; use amqp::Basic; use ofborg::easyamqp::TypedWrappers; -use hyper::header::ContentType; -use hyper::mime::Mime; use hyper::server::{Request, Response, Server}; -use prometheus::{Counter, Encoder, Gauge, HistogramVec, TextEncoder}; use std::thread; use std::time::Duration; @@ -31,8 +27,11 @@ fn main() { session.open_channel(3).unwrap() ); + let metrics = stats::MetricCollector::new(); + let collector = tasks::statscollector::StatCollectorWorker::new( - events + events, + metrics.clone(), ); let mut channel = session.open_channel(1).unwrap(); @@ -55,18 +54,12 @@ fn main() { thread::spawn(||{ - let encoder = TextEncoder::new(); let addr = "127.0.0.1:9898"; println!("listening addr {:?}", addr); Server::http(addr) .unwrap() - .handle(move |_: Request, mut res: Response| { - let metric_familys = prometheus::gather(); - let mut buffer = vec![]; - encoder.encode(&metric_familys, &mut buffer).unwrap(); - res.headers_mut() - .set(ContentType(encoder.format_type().parse::().unwrap())); - res.send(&buffer).unwrap(); + .handle(move |_: Request, res: Response| { + res.send(metrics.prometheus_output().as_bytes()).unwrap(); }) .unwrap(); }); diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index 12f3430..bda70db 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -1,7 +1,3 @@ -#[macro_use] -extern crate prometheus; - - #[macro_use] extern crate serde_derive; extern crate serde; diff --git a/ofborg/src/stats.rs b/ofborg/src/stats.rs index 3c779a4..f30e1d3 100644 --- a/ofborg/src/stats.rs +++ b/ofborg/src/stats.rs @@ -3,21 +3,18 @@ use amqp::Channel; use amqp::protocol::basic::BasicProperties; use amqp::Basic; +include!(concat!(env!("OUT_DIR"), "/events.rs")); + +#[macro_use] +mod macros { + #[macro_export] + macro_rules! my_macro(() => (FooBar)); +} + pub trait SysEvents: Send { fn notify(&mut self, event: Event); } -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all="kebab-case")] -pub enum Event { - StatCollectorLegacyEvent, - StatCollectorBogusEvent, - JobReceived, - JobDecodeSuccess, - JobDecodeFailure, - IssueAlreadyClosed, - IssueFetchFailed, -} #[derive(Serialize, Deserialize, Debug)] pub struct EventMessage { diff --git a/ofborg/src/tasks/statscollector.rs b/ofborg/src/tasks/statscollector.rs index 94b988c..35f2d36 100644 --- a/ofborg/src/tasks/statscollector.rs +++ b/ofborg/src/tasks/statscollector.rs @@ -1,172 +1,22 @@ -extern crate prometheus; extern crate amqp; extern crate env_logger; use serde_json; -use std::str::FromStr; use ofborg::worker; use ofborg::stats; use amqp::protocol::basic::{Deliver, BasicProperties}; -use std::collections::HashMap; -use std::mem; -use std::thread; -use std::time::Duration; -use std::sync::Arc; -use std::sync::Mutex; pub struct StatCollectorWorker { events: E, - counter_collectors: HashMap, + collector: stats::MetricCollector, } impl StatCollectorWorker { - pub fn new(events: E) -> StatCollectorWorker { - let mut worker = StatCollectorWorker { + pub fn new(events: E, collector: stats::MetricCollector) -> StatCollectorWorker { + StatCollectorWorker { events: events, - counter_collectors: HashMap::new(), - }; - - let initial_events: Vec = vec![ - stats::Event::StatCollectorLegacyEvent, - stats::Event::StatCollectorBogusEvent, - stats::Event::JobReceived, - stats::Event::JobDecodeSuccess, - stats::Event::JobDecodeFailure, - stats::Event::IssueAlreadyClosed, - stats::Event::IssueFetchFailed, - ]; - for initial_event in initial_events { - match initial_event { - // WARNING - // BEFORE YOU ADD A NEW VARIANT HERE, ADD IT - // TO THE LIST ABOVE! - // - // EACH VARIANT MUST BE INITIALIZED PRIOR - // TO REPORTING STATS - stats::Event::StatCollectorLegacyEvent => { - worker.register_counter( - &initial_event, - prometheus::Opts { - namespace: "ofborg".to_owned(), - subsystem: "stats_collector".to_owned(), - name: "legacy_event".to_owned(), - help: "Number of received legacy events".to_owned(), - const_labels: HashMap::new(), - variable_labels: vec!["instance".to_owned()], - } - ); - }, - stats::Event::StatCollectorBogusEvent => { - worker.register_counter( - &initial_event, - prometheus::Opts { - namespace: "ofborg".to_owned(), - subsystem: "stats_collector".to_owned(), - name: "bogus_event".to_owned(), - help: "Number of received unparseable events".to_owned(), - const_labels: HashMap::new(), - variable_labels: vec!["instance".to_owned()], - } - ); - }, - stats::Event::JobReceived => { - worker.register_counter( - &initial_event, - prometheus::Opts { - namespace: "ofborg".to_owned(), - subsystem: "generic_worker".to_owned(), - name: "job_received".to_owned(), - help: "Number of received worker jobs".to_owned(), - const_labels: HashMap::new(), - variable_labels: vec!["instance".to_owned()], - } - ); - }, - stats::Event::JobDecodeSuccess => { - worker.register_counter( - &initial_event, - prometheus::Opts { - namespace: "ofborg".to_owned(), - subsystem: "generic_worker".to_owned(), - name: "job_decode_successful".to_owned(), - help: "Number of successfully decoded jobs".to_owned(), - const_labels: HashMap::new(), - variable_labels: vec!["instance".to_owned()], - } - ); - }, - stats::Event::JobDecodeFailure => { - worker.register_counter( - &initial_event, - prometheus::Opts { - namespace: "ofborg".to_owned(), - subsystem: "generic_worker".to_owned(), - name: "job_decode_failure".to_owned(), - help: "Number of jobs which failed to parse".to_owned(), - const_labels: HashMap::new(), - variable_labels: vec!["instance".to_owned()], - } - ); - }, - stats::Event::IssueAlreadyClosed => { - worker.register_counter( - &initial_event, - prometheus::Opts { - namespace: "ofborg".to_owned(), - subsystem: "github".to_owned(), - name: "issue_closed".to_owned(), - help: "Number of jobs for issues which are already closed".to_owned(), - const_labels: HashMap::new(), - variable_labels: vec!["instance".to_owned()], - } - - ); - }, - stats::Event::IssueFetchFailed => { - worker.register_counter( - &initial_event, - prometheus::Opts { - namespace: "ofborg".to_owned(), - subsystem: "github".to_owned(), - name: "issue_fetch_fail".to_owned(), - help: "Number of failed fetches for GitHub issues".to_owned(), - const_labels: HashMap::new(), - variable_labels: vec!["instance".to_owned()], - } - ); - }, - }; + collector: collector, } - - return worker; - } - - pub fn counter(&self, event: &stats::Event) -> prometheus::CounterVec { - let disc = format!("{:?}", mem::discriminant(event)); - self.counter_collectors.get(&disc).unwrap().clone() - } - - pub fn register_counter( - &mut self, - event: &stats::Event, - opts: prometheus::Opts, - ) { - let disc = format!("{:?}", mem::discriminant(event)); - let orig_labels = opts.variable_labels.clone(); - let labels: Vec<&str> = orig_labels - .iter() - .map(|v| v.as_ref()) - .collect(); - - let counter = register_counter_vec!( - opts, labels.as_ref() - ).unwrap(); - counter.with_label_values(&[""]).inc_by(0.0); - - self.counter_collectors.insert( - disc, - counter - ); } } @@ -188,7 +38,7 @@ impl worker::SimpleWorker for StatCollectorWorker match serde_json::from_slice(&modified_body) { Ok(e) => { - self.events.notify(stats::Event::StatCollectorLegacyEvent); + self.events.notify(stats::Event::StatCollectorLegacyEvent(stats::event_metric_name(&e))); Ok(stats::EventMessage { sender: "".to_owned(), events: vec![e], @@ -209,31 +59,10 @@ impl worker::SimpleWorker for StatCollectorWorker } fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions { + let sender = job.sender.clone(); for event in job.events.iter() { - match *event { - stats::Event::StatCollectorLegacyEvent => { - self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); - }, - stats::Event::StatCollectorBogusEvent => { - self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); - }, - stats::Event::JobReceived => { - self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); - }, - stats::Event::JobDecodeSuccess => { - self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); - }, - stats::Event::JobDecodeFailure => { - self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); - }, - stats::Event::IssueAlreadyClosed => { - self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); - }, - stats::Event::IssueFetchFailed => { - self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); - }, - } + self.collector.record(sender.clone(), event.clone()); } return vec![worker::Action::Ack]; From cdb88316b946986a225436ad1ef9b7435c24623f Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Mon, 12 Feb 2018 08:21:14 -0500 Subject: [PATCH 4/7] Tick m evaluation completes in the mass rebuildere --- ofborg/build.rs | 5 +++++ ofborg/src/tasks/massrebuilder.rs | 2 ++ 2 files changed, 7 insertions(+) diff --git a/ofborg/build.rs b/ofborg/build.rs index e458dcf..9c282d6 100644 --- a/ofborg/build.rs +++ b/ofborg/build.rs @@ -266,6 +266,11 @@ fn events() -> Vec { "Number of failed fetches for GitHub issues", None, ), + Metric::ticker( + "TaskEvaluationCheckComplete", + "Number of completed evaluation tasks", + None, + ), /* Metric::counter( "TimeElapsed", diff --git a/ofborg/src/tasks/massrebuilder.rs b/ofborg/src/tasks/massrebuilder.rs index 0266bdf..b5fb03e 100644 --- a/ofborg/src/tasks/massrebuilder.rs +++ b/ofborg/src/tasks/massrebuilder.rs @@ -510,6 +510,8 @@ impl worker::SimpleWorker for MassRebuildWorker Date: Mon, 12 Feb 2018 08:35:52 -0500 Subject: [PATCH 5/7] Report stats on evaluation duration per-branch --- ofborg/build.rs | 4 +++- ofborg/src/tasks/massrebuilder.rs | 10 +++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/ofborg/build.rs b/ofborg/build.rs index 9c282d6..e510f9c 100644 --- a/ofborg/build.rs +++ b/ofborg/build.rs @@ -244,7 +244,9 @@ fn events() -> Vec { Metric::counter( "EvaluationDuration", "Amount of time spent running evaluations", - None + Some(vec![ + ("branch", "String"), + ]), ), Metric::ticker( "JobDecodeSuccess", diff --git a/ofborg/src/tasks/massrebuilder.rs b/ofborg/src/tasks/massrebuilder.rs index b5fb03e..ed35114 100644 --- a/ofborg/src/tasks/massrebuilder.rs +++ b/ofborg/src/tasks/massrebuilder.rs @@ -11,7 +11,7 @@ use std::path::PathBuf; use ofborg::checkout; use ofborg::message::{massrebuildjob, buildjob}; use ofborg::nix::Nix; - +use std::time::Instant; use ofborg::acl::ACL; use ofborg::stats; use ofborg::stats::Event; @@ -186,6 +186,8 @@ impl worker::SimpleWorker for MassRebuildWorker worker::SimpleWorker for MassRebuildWorker Date: Mon, 12 Feb 2018 08:36:25 -0500 Subject: [PATCH 6/7] Track how often target branches evaluate, to setup a potential alert for it. --- ofborg/build.rs | 7 +++++++ ofborg/src/tasks/massrebuilder.rs | 1 + 2 files changed, 8 insertions(+) diff --git a/ofborg/build.rs b/ofborg/build.rs index e510f9c..1c30cbd 100644 --- a/ofborg/build.rs +++ b/ofborg/build.rs @@ -248,6 +248,13 @@ fn events() -> Vec { ("branch", "String"), ]), ), + Metric::ticker( + "TargetBranchFailsEvaluation", + "Number of PR evaluations which failed because the target branch failed", + Some(vec![ + ("branch", "String"), + ]), + ), Metric::ticker( "JobDecodeSuccess", "Number of successfully decoded jobs", diff --git a/ofborg/src/tasks/massrebuilder.rs b/ofborg/src/tasks/massrebuilder.rs index ed35114..88abdb0 100644 --- a/ofborg/src/tasks/massrebuilder.rs +++ b/ofborg/src/tasks/massrebuilder.rs @@ -196,6 +196,7 @@ impl worker::SimpleWorker for MassRebuildWorker Date: Mon, 12 Feb 2018 20:16:59 -0500 Subject: [PATCH 7/7] Emit a counter for each timed duration --- ofborg/build.rs | 7 +++++++ ofborg/src/tasks/massrebuilder.rs | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/ofborg/build.rs b/ofborg/build.rs index 1c30cbd..4881c36 100644 --- a/ofborg/build.rs +++ b/ofborg/build.rs @@ -248,6 +248,13 @@ fn events() -> Vec { ("branch", "String"), ]), ), + Metric::ticker( + "EvaluationDurationCount", + "Number of timed evaluations performed", + Some(vec![ + ("branch", "String"), + ]), + ), Metric::ticker( "TargetBranchFailsEvaluation", "Number of PR evaluations which failed because the target branch failed", diff --git a/ofborg/src/tasks/massrebuilder.rs b/ofborg/src/tasks/massrebuilder.rs index 88abdb0..136f2f8 100644 --- a/ofborg/src/tasks/massrebuilder.rs +++ b/ofborg/src/tasks/massrebuilder.rs @@ -210,6 +210,11 @@ impl worker::SimpleWorker for MassRebuildWorker