mas_tower/metrics/
in_flight.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use opentelemetry::{KeyValue, metrics::UpDownCounter};
8use pin_project_lite::pin_project;
9use tower::{Layer, Service};
10
11use crate::{METER, MetricsAttributes};
12
13/// A [`Layer`] that records the number of in-flight requests.
14///
15/// # Generic Parameters
16///
17/// * `OnRequest`: A type that can extract attributes from a request.
18#[derive(Clone, Debug)]
19pub struct InFlightCounterLayer<OnRequest = ()> {
20    counter: UpDownCounter<i64>,
21    on_request: OnRequest,
22}
23
24impl InFlightCounterLayer {
25    /// Create a new [`InFlightCounterLayer`].
26    #[must_use]
27    pub fn new(name: &'static str) -> Self {
28        let counter = METER
29            .i64_up_down_counter(name)
30            .with_unit("{request}")
31            .with_description("The number of in-flight requests")
32            .build();
33
34        Self {
35            counter,
36            on_request: (),
37        }
38    }
39}
40
41impl<F> InFlightCounterLayer<F> {
42    /// Set the [`MetricsAttributes`] to use.
43    #[must_use]
44    pub fn on_request<OnRequest>(self, on_request: OnRequest) -> InFlightCounterLayer<OnRequest> {
45        InFlightCounterLayer {
46            counter: self.counter,
47            on_request,
48        }
49    }
50}
51
52impl<S, OnRequest> Layer<S> for InFlightCounterLayer<OnRequest>
53where
54    OnRequest: Clone,
55{
56    type Service = InFlightCounterService<S, OnRequest>;
57
58    fn layer(&self, inner: S) -> Self::Service {
59        InFlightCounterService {
60            inner,
61            counter: self.counter.clone(),
62            on_request: self.on_request.clone(),
63        }
64    }
65}
66
67/// A middleware that records the number of in-flight requests.
68///
69/// # Generic Parameters
70///
71/// * `S`: The type of the inner service.
72/// * `OnRequest`: A type that can extract attributes from a request.
73#[derive(Clone, Debug)]
74pub struct InFlightCounterService<S, OnRequest = ()> {
75    inner: S,
76    counter: UpDownCounter<i64>,
77    on_request: OnRequest,
78}
79
80/// A guard that decrements the in-flight request count when dropped.
81struct InFlightGuard {
82    counter: UpDownCounter<i64>,
83    attributes: Vec<KeyValue>,
84}
85
86impl InFlightGuard {
87    fn new(counter: UpDownCounter<i64>, attributes: Vec<KeyValue>) -> Self {
88        counter.add(1, &attributes);
89
90        Self {
91            counter,
92            attributes,
93        }
94    }
95}
96
97impl Drop for InFlightGuard {
98    fn drop(&mut self) {
99        self.counter.add(-1, &self.attributes);
100    }
101}
102
103pin_project! {
104    /// The future returned by [`InFlightCounterService`]
105    pub struct InFlightFuture<F> {
106        guard: InFlightGuard,
107
108        #[pin]
109        inner: F,
110    }
111}
112
113impl<F> Future for InFlightFuture<F>
114where
115    F: Future,
116{
117    type Output = F::Output;
118
119    fn poll(
120        self: std::pin::Pin<&mut Self>,
121        cx: &mut std::task::Context<'_>,
122    ) -> std::task::Poll<Self::Output> {
123        self.project().inner.poll(cx)
124    }
125}
126
127impl<R, S, OnRequest> Service<R> for InFlightCounterService<S, OnRequest>
128where
129    S: Service<R>,
130    OnRequest: MetricsAttributes<R>,
131{
132    type Response = S::Response;
133    type Error = S::Error;
134    type Future = InFlightFuture<S::Future>;
135
136    fn poll_ready(
137        &mut self,
138        cx: &mut std::task::Context<'_>,
139    ) -> std::task::Poll<Result<(), Self::Error>> {
140        self.inner.poll_ready(cx)
141    }
142
143    fn call(&mut self, req: R) -> Self::Future {
144        // Extract attributes from the request.
145        let attributes = self.on_request.attributes(&req).collect();
146
147        // Increment the in-flight request count.
148        let guard = InFlightGuard::new(self.counter.clone(), attributes);
149
150        // Call the inner service, and return a future that decrements the in-flight
151        // when dropped.
152        let inner = self.inner.call(req);
153        InFlightFuture { guard, inner }
154    }
155}