[−][src]Struct tower::util::CallAll
This is a Stream of responses resulting from calling the wrapped Service for each
request received on the wrapped Stream.
use futures::future::{ready, Ready}; use futures::StreamExt; use futures::channel::mpsc; use tower_service::Service; use tower::util::ServiceExt; // First, we need to have a Service to process our requests. #[derive(Debug, Eq, PartialEq)] struct FirstLetter; impl Service<&'static str> for FirstLetter { type Response = &'static str; type Error = Box<dyn Error + Send + Sync>; type Future = Ready<Result<Self::Response, Self::Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { ready(Ok(&req[..1])) } } #[tokio::main] async fn main() { // Next, we need a Stream of requests. let (mut reqs, rx) = mpsc::unbounded(); // Note that we have to help Rust out here by telling it what error type to use. // Specifically, it has to be From<Service::Error> + From<Stream::Error>. let mut rsps = FirstLetter.call_all(rx); // Now, let's send a few requests and then check that we get the corresponding responses. reqs.unbounded_send("one").unwrap(); reqs.unbounded_send("two").unwrap(); reqs.unbounded_send("three").unwrap(); drop(reqs); // We then loop over the response Strem that we get back from call_all. let mut i = 0usize; while let Some(rsp) = rsps.next().await { // Each response is a Result (we could also have used TryStream::try_next) match (i + 1, rsp.unwrap()) { (1, "o") | (2, "t") | (3, "t") => {} (n, i) => { unreachable!("{}. response was '{}'", n, i); } } i += 1; } // And at the end, we can get the Service back when there are no more requests. assert_eq!(rsps.into_inner(), FirstLetter); }
Implementations
impl<Svc, S> CallAll<Svc, S> where
Svc: Service<S::Item>,
Svc::Error: Into<BoxError>,
S: Stream, [src]
Svc: Service<S::Item>,
Svc::Error: Into<BoxError>,
S: Stream,
pub fn new(service: Svc, stream: S) -> CallAll<Svc, S>[src]
Create new CallAll combinator.
Each request yielded by stream is passed to svc, and the resulting responses are
yielded in the same order by the implementation of Stream for CallAll.
pub fn into_inner(self) -> Svc[src]
pub fn take_service(self: Pin<&mut Self>) -> Svc[src]
Extract the wrapped Service.
This CallAll can no longer be used after this function has been called.
Panics
Panics if take_service was already called.
pub fn unordered(self) -> CallAllUnordered<Svc, S>[src]
Trait Implementations
impl<Svc: Debug, S: Debug> Debug for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
Svc::Future: Debug, [src]
Svc: Service<S::Item>,
S: Stream,
Svc::Future: Debug,
impl<Svc, S> Stream for CallAll<Svc, S> where
Svc: Service<S::Item>,
Svc::Error: Into<BoxError>,
S: Stream, [src]
Svc: Service<S::Item>,
Svc::Error: Into<BoxError>,
S: Stream,
type Item = Result<Svc::Response, BoxError>
Values yielded by the stream.
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>[src]
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>
fn size_hint(&self) -> (usize, Option<usize>)[src]
impl<'pin, Svc, S> Unpin for CallAll<Svc, S> where
Svc: Service<S::Item>,
S: Stream,
__CallAll<'pin, Svc, S>: Unpin, [src]
Svc: Service<S::Item>,
S: Stream,
__CallAll<'pin, Svc, S>: Unpin,
Auto Trait Implementations
impl<Svc, S> !RefUnwindSafe for CallAll<Svc, S>
impl<Svc, S> Send for CallAll<Svc, S> where
S: Send,
Svc: Send,
<Svc as Service<<S as Stream>::Item>>::Error: Send,
<Svc as Service<<S as Stream>::Item>>::Future: Send,
<Svc as Service<<S as Stream>::Item>>::Response: Send,
S: Send,
Svc: Send,
<Svc as Service<<S as Stream>::Item>>::Error: Send,
<Svc as Service<<S as Stream>::Item>>::Future: Send,
<Svc as Service<<S as Stream>::Item>>::Response: Send,
impl<Svc, S> Sync for CallAll<Svc, S> where
S: Sync,
Svc: Sync,
<Svc as Service<<S as Stream>::Item>>::Error: Sync,
<Svc as Service<<S as Stream>::Item>>::Future: Sync,
<Svc as Service<<S as Stream>::Item>>::Response: Sync,
S: Sync,
Svc: Sync,
<Svc as Service<<S as Stream>::Item>>::Error: Sync,
<Svc as Service<<S as Stream>::Item>>::Future: Sync,
<Svc as Service<<S as Stream>::Item>>::Response: Sync,
impl<Svc, S> !UnwindSafe for CallAll<Svc, S>
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized, [src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized, [src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized, [src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T[src]
impl<K, S, E, D> Discover for D where
D: TryStream<Ok = Change<K, S>, Error = E> + ?Sized,
K: Eq, [src]
D: TryStream<Ok = Change<K, S>, Error = E> + ?Sized,
K: Eq,
type Key = K
A unique identifier for each active service. Read more
type Service = S
type Error = E
Error produced during discovery
pub fn poll_discover(
Pin<&mut D>,
&mut Context<'_>
) -> Poll<Option<Result<<D as TryStream>::Ok, <D as TryStream>::Error>>>[src]
Pin<&mut D>,
&mut Context<'_>
) -> Poll<Option<Result<<D as TryStream>::Ok, <D as TryStream>::Error>>>
impl<T> From<T> for T[src]
impl<T> Instrument for T[src]
fn instrument(self, span: Span) -> Instrumented<Self>[src]
fn in_current_span(self) -> Instrumented<Self>[src]
impl<T, U> Into<U> for T where
U: From<T>, [src]
U: From<T>,
impl<T> StreamExt for T where
T: Stream + ?Sized, [src]
T: Stream + ?Sized,
fn next(&mut self) -> Next<'_, Self> where
Self: Unpin, [src]
Self: Unpin,
fn into_future(self) -> StreamFuture<Self> where
Self: Unpin, [src]
Self: Unpin,
fn map<T, F>(self, f: F) -> Map<Self, F> where
F: FnMut(Self::Item) -> T, [src]
F: FnMut(Self::Item) -> T,
fn enumerate(self) -> Enumerate<Self>[src]
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>, [src]
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>,
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future, [src]
F: FnMut(Self::Item) -> Fut,
Fut: Future,
fn collect<C>(self) -> Collect<Self, C> where
C: Default + Extend<Self::Item>, [src]
C: Default + Extend<Self::Item>,
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
Self: Stream<Item = (A, B)>, [src]
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
Self: Stream<Item = (A, B)>,
fn concat(self) -> Concat<Self> where
Self::Item: Extend<<Self::Item as IntoIterator>::Item>,
Self::Item: IntoIterator,
Self::Item: Default, [src]
Self::Item: Extend<<Self::Item as IntoIterator>::Item>,
Self::Item: IntoIterator,
Self::Item: Default,
fn cycle(self) -> Cycle<Self> where
Self: Clone, [src]
Self: Clone,
fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where
F: FnMut(T, Self::Item) -> Fut,
Fut: Future<Output = T>, [src]
F: FnMut(T, Self::Item) -> Fut,
Fut: Future<Output = T>,
fn flatten(self) -> Flatten<Self> where
Self::Item: Stream, [src]
Self::Item: Stream,
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where
F: FnMut(Self::Item) -> U,
U: Stream, [src]
F: FnMut(Self::Item) -> U,
U: Stream,
fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where
F: FnMut(&mut S, Self::Item) -> Fut,
Fut: Future<Output = Option<B>>, [src]
F: FnMut(&mut S, Self::Item) -> Fut,
Fut: Future<Output = Option<B>>,
fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Item) -> Fut,
Fut: Future<Output = bool>,
fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where
Fut: Future, [src]
Fut: Future,
fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>, [src]
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>,
fn for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F
) -> ForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>, [src]
self,
limit: impl Into<Option<usize>>,
f: F
) -> ForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = ()>,
fn take(self, n: usize) -> Take<Self>[src]
fn skip(self, n: usize) -> Skip<Self>[src]
fn fuse(self) -> Fuse<Self>[src]
fn by_ref(&mut self) -> &mut Self[src]
fn catch_unwind(self) -> CatchUnwind<Self> where
Self: UnwindSafe, [src]
Self: UnwindSafe,
fn boxed<'a>(
self
) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a + Send, Global>> where
Self: Send + 'a, [src]
self
) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a + Send, Global>> where
Self: Send + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a, Global>> where
Self: 'a, [src]
Self: 'a,
fn buffered(self, n: usize) -> Buffered<Self> where
Self::Item: Future, [src]
Self::Item: Future,
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self> where
Self::Item: Future, [src]
Self::Item: Future,
fn zip<St>(self, other: St) -> Zip<Self, St> where
St: Stream, [src]
St: Stream,
fn chain<St>(self, other: St) -> Chain<Self, St> where
St: Stream<Item = Self::Item>, [src]
St: Stream<Item = Self::Item>,
fn peekable(self) -> Peekable<Self>[src]
fn chunks(self, capacity: usize) -> Chunks<Self>[src]
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>[src]
fn forward<S>(self, sink: S) -> Forward<Self, S> where
S: Sink<Self::Ok, Error = Self::Error>,
Self: TryStream, [src]
S: Sink<Self::Ok, Error = Self::Error>,
Self: TryStream,
fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where
Self: Sink<Item>, [src]
Self: Sink<Item>,
fn inspect<F>(self, f: F) -> Inspect<Self, F> where
F: FnMut(&Self::Item), [src]
F: FnMut(&Self::Item),
fn left_stream<B>(self) -> Either<Self, B> where
B: Stream<Item = Self::Item>, [src]
B: Stream<Item = Self::Item>,
fn right_stream<B>(self) -> Either<B, Self> where
B: Stream<Item = Self::Item>, [src]
B: Stream<Item = Self::Item>,
fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where
Self: Unpin, [src]
Self: Unpin,
fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where
Self: Unpin + FusedStream, [src]
Self: Unpin + FusedStream,
impl<St> StreamExt for St where
St: Stream + ?Sized, [src]
St: Stream + ?Sized,
fn next(&mut self) -> Next<'_, Self> where
Self: Unpin, [src]
Self: Unpin,
fn try_next<T, E>(&mut self) -> TryNext<'_, Self> where
Self: Stream<Item = Result<T, E>> + Unpin, [src]
Self: Stream<Item = Result<T, E>> + Unpin,
fn map<T, F>(self, f: F) -> Map<Self, F> where
F: FnMut(Self::Item) -> T, [src]
F: FnMut(Self::Item) -> T,
fn merge<U>(self, other: U) -> Merge<Self, U> where
U: Stream<Item = Self::Item>, [src]
U: Stream<Item = Self::Item>,
fn filter<F>(self, f: F) -> Filter<Self, F> where
F: FnMut(&Self::Item) -> bool, [src]
F: FnMut(&Self::Item) -> bool,
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where
F: FnMut(Self::Item) -> Option<T>, [src]
F: FnMut(Self::Item) -> Option<T>,
fn fuse(self) -> Fuse<Self>[src]
fn take(self, n: usize) -> Take<Self>[src]
fn take_while<F>(self, f: F) -> TakeWhile<Self, F> where
F: FnMut(&Self::Item) -> bool, [src]
F: FnMut(&Self::Item) -> bool,
fn skip(self, n: usize) -> Skip<Self>[src]
fn skip_while<F>(self, f: F) -> SkipWhile<Self, F> where
F: FnMut(&Self::Item) -> bool, [src]
F: FnMut(&Self::Item) -> bool,
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F> where
F: FnMut(Self::Item) -> bool,
Self: Unpin, [src]
F: FnMut(Self::Item) -> bool,
Self: Unpin,
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F> where
F: FnMut(Self::Item) -> bool,
Self: Unpin, [src]
F: FnMut(Self::Item) -> bool,
Self: Unpin,
fn chain<U>(self, other: U) -> Chain<Self, U> where
U: Stream<Item = Self::Item>, [src]
U: Stream<Item = Self::Item>,
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F> where
F: FnMut(B, Self::Item) -> B, [src]
F: FnMut(B, Self::Item) -> B,
fn collect<T>(self) -> Collect<Self, T> where
T: FromStream<Self::Item>, [src]
T: FromStream<Self::Item>,
fn timeout(self, duration: Duration) -> Timeout<Self>[src]
fn throttle(self, duration: Duration) -> Throttle<Self>[src]
impl<T, U> TryFrom<U> for T where
U: Into<T>, [src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>, [src]
U: TryFrom<T>,
type Error = <U as TryFrom<T>>::Error
The type returned in the event of a conversion error.
pub fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>[src]
impl<S, T, E> TryStream for S where
S: Stream<Item = Result<T, E>> + ?Sized, [src]
S: Stream<Item = Result<T, E>> + ?Sized,
type Ok = T
The type of successful values yielded by this future
type Error = E
The type of failures yielded by this future
pub fn try_poll_next(
self: Pin<&mut S>,
cx: &mut Context<'_>
) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>[src]
self: Pin<&mut S>,
cx: &mut Context<'_>
) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>
impl<S> TryStreamExt for S where
S: TryStream + ?Sized, [src]
S: TryStream + ?Sized,
fn err_into<E>(self) -> ErrInto<Self, E> where
Self::Error: Into<E>, [src]
Self::Error: Into<E>,
fn map_ok<T, F>(self, f: F) -> MapOk<Self, F> where
F: FnMut(Self::Ok) -> T, [src]
F: FnMut(Self::Ok) -> T,
fn map_err<E, F>(self, f: F) -> MapErr<Self, F> where
F: FnMut(Self::Error) -> E, [src]
F: FnMut(Self::Error) -> E,
fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Error = Self::Error>, [src]
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Error = Self::Error>,
fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where
F: FnMut(Self::Error) -> Fut,
Fut: TryFuture<Ok = Self::Ok>, [src]
F: FnMut(Self::Error) -> Fut,
Fut: TryFuture<Ok = Self::Ok>,
fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where
F: FnMut(&Self::Ok), [src]
F: FnMut(&Self::Ok),
fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> where
F: FnMut(&Self::Error), [src]
F: FnMut(&Self::Error),
fn into_stream(self) -> IntoStream<Self>[src]
fn try_next(&mut self) -> TryNext<'_, Self> where
Self: Unpin, [src]
Self: Unpin,
fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = Self::Error>, [src]
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = Self::Error>,
fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>, [src]
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>,
fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>, [src]
F: FnMut(&Self::Ok) -> Fut,
Fut: TryFuture<Ok = bool, Error = Self::Error>,
fn try_for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F
) -> TryForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: Future<Output = Result<(), Self::Error>>, [src]
self,
limit: impl Into<Option<usize>>,
f: F
) -> TryForEachConcurrent<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: Future<Output = Result<(), Self::Error>>,
fn try_collect<C>(self) -> TryCollect<Self, C> where
C: Default + Extend<Self::Ok>, [src]
C: Default + Extend<Self::Ok>,
fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where
F: FnMut(&Self::Ok) -> Fut,
Fut: Future<Output = bool>, [src]
F: FnMut(&Self::Ok) -> Fut,
Fut: Future<Output = bool>,
fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, [src]
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
fn try_flatten(self) -> TryFlatten<Self> where
Self::Ok: TryStream,
<Self::Ok as TryStream>::Error: From<Self::Error>, [src]
Self::Ok: TryStream,
<Self::Ok as TryStream>::Error: From<Self::Error>,
fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where
F: FnMut(T, Self::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = Self::Error>, [src]
F: FnMut(T, Self::Ok) -> Fut,
Fut: TryFuture<Ok = T, Error = Self::Error>,
fn try_concat(self) -> TryConcat<Self> where
Self::Ok: Extend<<Self::Ok as IntoIterator>::Item>,
Self::Ok: IntoIterator,
Self::Ok: Default, [src]
Self::Ok: Extend<<Self::Ok as IntoIterator>::Item>,
Self::Ok: IntoIterator,
Self::Ok: Default,
fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error, [src]
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error,
fn try_buffered(self, n: usize) -> TryBuffered<Self> where
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error, [src]
Self::Ok: TryFuture,
<Self::Ok as TryFuture>::Error == Self::Error,
fn try_poll_next_unpin(
&mut self,
cx: &mut Context<'_>
) -> Poll<Option<Result<Self::Ok, Self::Error>>> where
Self: Unpin, [src]
&mut self,
cx: &mut Context<'_>
) -> Poll<Option<Result<Self::Ok, Self::Error>>> where
Self: Unpin,
fn into_async_read(self) -> IntoAsyncRead<Self> where
Self: TryStreamExt<Error = Error> + Unpin,
Self::Ok: AsRef<[u8]>, [src]
Self: TryStreamExt<Error = Error> + Unpin,
Self::Ok: AsRef<[u8]>,
impl<V, T> VZip<V> for T where
V: MultiLane<T>, [src]
V: MultiLane<T>,