mas_tower/metrics/
in_flight.rs1use opentelemetry::{KeyValue, metrics::UpDownCounter};
8use pin_project_lite::pin_project;
9use tower::{Layer, Service};
10
11use crate::{METER, MetricsAttributes};
12
13#[derive(Clone, Debug)]
19pub struct InFlightCounterLayer<OnRequest = ()> {
20 counter: UpDownCounter<i64>,
21 on_request: OnRequest,
22}
23
24impl InFlightCounterLayer {
25 #[must_use]
27 pub fn new(name: &'static str) -> Self {
28 let counter = METER
29 .i64_up_down_counter(name)
30 .with_unit("{request}")
31 .with_description("The number of in-flight requests")
32 .build();
33
34 Self {
35 counter,
36 on_request: (),
37 }
38 }
39}
40
41impl<F> InFlightCounterLayer<F> {
42 #[must_use]
44 pub fn on_request<OnRequest>(self, on_request: OnRequest) -> InFlightCounterLayer<OnRequest> {
45 InFlightCounterLayer {
46 counter: self.counter,
47 on_request,
48 }
49 }
50}
51
52impl<S, OnRequest> Layer<S> for InFlightCounterLayer<OnRequest>
53where
54 OnRequest: Clone,
55{
56 type Service = InFlightCounterService<S, OnRequest>;
57
58 fn layer(&self, inner: S) -> Self::Service {
59 InFlightCounterService {
60 inner,
61 counter: self.counter.clone(),
62 on_request: self.on_request.clone(),
63 }
64 }
65}
66
67#[derive(Clone, Debug)]
74pub struct InFlightCounterService<S, OnRequest = ()> {
75 inner: S,
76 counter: UpDownCounter<i64>,
77 on_request: OnRequest,
78}
79
80struct InFlightGuard {
82 counter: UpDownCounter<i64>,
83 attributes: Vec<KeyValue>,
84}
85
86impl InFlightGuard {
87 fn new(counter: UpDownCounter<i64>, attributes: Vec<KeyValue>) -> Self {
88 counter.add(1, &attributes);
89
90 Self {
91 counter,
92 attributes,
93 }
94 }
95}
96
97impl Drop for InFlightGuard {
98 fn drop(&mut self) {
99 self.counter.add(-1, &self.attributes);
100 }
101}
102
103pin_project! {
104 pub struct InFlightFuture<F> {
106 guard: InFlightGuard,
107
108 #[pin]
109 inner: F,
110 }
111}
112
113impl<F> Future for InFlightFuture<F>
114where
115 F: Future,
116{
117 type Output = F::Output;
118
119 fn poll(
120 self: std::pin::Pin<&mut Self>,
121 cx: &mut std::task::Context<'_>,
122 ) -> std::task::Poll<Self::Output> {
123 self.project().inner.poll(cx)
124 }
125}
126
127impl<R, S, OnRequest> Service<R> for InFlightCounterService<S, OnRequest>
128where
129 S: Service<R>,
130 OnRequest: MetricsAttributes<R>,
131{
132 type Response = S::Response;
133 type Error = S::Error;
134 type Future = InFlightFuture<S::Future>;
135
136 fn poll_ready(
137 &mut self,
138 cx: &mut std::task::Context<'_>,
139 ) -> std::task::Poll<Result<(), Self::Error>> {
140 self.inner.poll_ready(cx)
141 }
142
143 fn call(&mut self, req: R) -> Self::Future {
144 let attributes = self.on_request.attributes(&req).collect();
146
147 let guard = InFlightGuard::new(self.counter.clone(), attributes);
149
150 let inner = self.inner.call(req);
153 InFlightFuture { guard, inner }
154 }
155}