'Why doesn't BroadcastStream implement stream

I'm writing an HTTP server in the actix_web framework using the tokio runtime and tokio-stream utilities. I would like to send some data (actix::web::Bytes) from a broadcast channel in a response. I also want to do some other stuff, so I'd like to write a wrapper implementing the MessageBody trait that at some point forwards the .poll_next() method. The problematic code looks like this:

use actix_web::web::Bytes;
use actix_web::body::{BodyStream, MessageBody};
use tokio_stream::wrappers::BroadcastStream;
use futures_core::stream::Stream;
// ...

struct Wrapper {
    // some other stuff
    broad_rx: BodyStream<BroadcastStream<broadcast::Receiver<Bytes>>>
}
impl MessageBody for Wrapper {
    type Error = BroadcastStreamRecvError;

    fn size(&self) -> actix_web::body::BodySize {
        BodySize::Stream
    }

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Result<Bytes, Self::Error>>> {
        /*
        do some other stuff
        */

        self.broad_rx.poll_next(cx) // XXX the problem
    }
}

This snippet doesn't compile and returns the following error:

error[E0599]: no method named `poll_next` found for struct `BodyStream` in the current scope
   --> crates/demo/src/server.rs:***:***
    |
*** |         self.broad_rx.poll_next(cx)
    |                       ^^^^^^^^^ method not found in `BodyStream<BroadcastStream<tokio::sync::broadcast::Receiver<actix_web::web::Bytes>>>`

I expect BodyStream to implement MessageBody and thus the .poll_next() method. because of this impl and the following chain of implementations:

Bytes implements:

  1. Clone
  2. Send
  3. I'm not sure how to deduce whether it has a 'static lifetime in this context, as it's not a reference. If the lifetime isn't 'static I'd assume that is the problem, but I also don't know how to solve it. However, I'd also assume that the compiler would mention something about lifetimes if that is the case.

BroadcastStream<Bytes> implements:

  1. Stream, if Bytes is 'static + Clone + Send. Item = Result<Bytes, BroadcastStreamRecvError> in that case.

BroadcastStreamRecvError implements:

  1. Error and via this blanket implementation presumably also Into<Box<dyn Error + 'static, Global>>

BodyStream<Bytes> implements:

  1. MessageBody because the Stream bound is satisfied by BroadcastStream<Bytes> and the 'static + Into<Box<dyn Error + 'static, Global>> is satisfied by BroadcastStreamRecvError (except those pesky 'static requirements again)

Yet the error suggests, that one of the implementations doesn't apply for whatever reason. In fact even if I replace

broad_rx: BodyStream<BroadcastStream<broadcast::Receiver<Bytes>>>

with

broad_rx: BroadcastStream<broadcast::Receiver<Bytes>>

I still get an error saying that the .poll_next() method doesn't exist, despite the fact that this type should implement Stream.

How can I implement the MessageBody wrapper or further debug this?



Solution 1:[1]

The presented code has three problems preventing it from being compiled. The one causing the given error is caused by the problem put forth by @kmdreko. The reason why Pin is needed is a bit complicated and can be found in the async rust book. In a superficial way: If you want to use a .poll method then the thing needs to be pinned. One of the ways to solve this problem is using the pin-project crate that simplifies the creation of projections (helper functions used to access fields of pinned structs). Otherwise, one can write their own projections manually, like in this article.

The declaration then looks like this:

#[pin_project]
struct Wrapper {
    #[pin]
    broad_rx: BroadcastStream<broadcast::Receiver<Bytes>>,
}

and the projection that presents the pinned field (see further below) can be obtained like so:

let this = self.project();

This still doesn't compile, but for a trivial reason. First of all, the BodyStream is not needed at all, because the MessageBody is being manually implemented anyway. Secondly, the generic type of BroadcastStream isn't supposed to be the wrapped type (also because the wrapped type will always be broadcast::Receiver<_>), but the type returned from the channel. The following code compiles:

struct Wrapper {
    broad_rx: BroadcastStream<Bytes>
}
impl MessageBody for Wrapper {
    type Error = BroadcastStreamRecvError;

    fn size(&self) -> actix_web::body::BodySize {
        BodySize::Stream
    }

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Result<Bytes, Self::Error>>> {
        let this = self.project();
        this.broad_rx.poll_next(cx)
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1