Created
January 20, 2018 22:41
-
-
Save drahnr/f455dbeabb4ebcea22f05c41795fb85a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #[macro_use] | |
| extern crate futures; | |
| extern crate tokio_core; | |
| extern crate tokio_io; | |
| use futures::{Future, Stream}; | |
| use tokio_io::{io, AsyncRead}; | |
| use tokio_core::net::TcpListener; | |
| use tokio_core::reactor::Core; | |
| use tokio_core::reactor::Timeout; | |
| use std::time::Duration; | |
| use futures::future; | |
| use futures::{Sink,AsyncSink}; | |
| use futures::{Async,Poll}; | |
| use futures::sync::mpsc::{UnboundedSender,UnboundedReceiver,unbounded}; | |
| use futures::stream::FuturesUnordered; | |
| type MqttWriter = UnboundedSender<u64>; | |
| type MqttReader = UnboundedReceiver<u64>; | |
| #[derive(Debug)] | |
| enum State { | |
| Init, | |
| Processing, | |
| } | |
| // #[derive(Debug)] | |
| struct Sinker { | |
| state : State, | |
| writer : MqttWriter, | |
| value : u64, | |
| } | |
| impl Sinker { | |
| pub fn new(value : u64, writer : MqttWriter) -> Self { | |
| Self { state : State::Init, writer, value } | |
| } | |
| } | |
| impl Future for Sinker { | |
| type Item = (); | |
| type Error = (); | |
| fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | |
| loop { | |
| match self.state { | |
| State::Init => { | |
| println!("Init"); | |
| match self.writer.start_send(self.value) { | |
| Ok(AsyncSink::Ready) => { | |
| self.state = State::Processing; | |
| println!("Sink Ready"); | |
| }, | |
| Ok(AsyncSink::NotReady(_)) => { | |
| println!("Sink - Not ready"); | |
| return Ok(Async::NotReady) | |
| }, | |
| Err(e) => { | |
| println!("Sink - Err {:?}", e); | |
| return Err(()); | |
| } | |
| } | |
| }, | |
| State::Processing => { | |
| let x = self.writer.poll_complete().map_err(|_| () ); | |
| println!("Processing {:?}", x); | |
| if let Ok(x) = x { | |
| match x { | |
| Async::Ready(_) => return Ok(Async::Ready(())), | |
| Async::NotReady => return Ok(Async::NotReady), | |
| _ => unreachable!(), | |
| } | |
| } else { | |
| println!("Error on poll_complete"); | |
| return Err(()); | |
| } | |
| }, | |
| _ => unimplemented!() | |
| } | |
| } | |
| } | |
| } | |
| fn main() { | |
| // Create the event loop that will drive this server | |
| let mut core = Core::new().unwrap(); | |
| let handle = core.handle(); | |
| let x : Vec<u64> = vec![1,2,3,4,5,6,7,8,9]; | |
| let mut bunch = FuturesUnordered::<_>::new(); | |
| let (mut writer, mut reader) = futures::sync::mpsc::unbounded::<u64>(); | |
| // let r_fut = reader.for_each(|x| { println!("{:?}", x); future::ok(x) }).map(|_| ()).map_err(|_| ()); | |
| x.into_iter().for_each(|item| { | |
| let mut writer = writer.clone(); | |
| let mut item = item.clone(); | |
| println!("{:?}", item); | |
| let f = Timeout::new(Duration::new(1 + item,0), &handle).unwrap() | |
| .map_err(|_| ()) | |
| .and_then(move |_| { Sinker::new(item, writer ) } ) | |
| .and_then(|x| { println!("item is done"); Ok(()) } ); | |
| bunch.push(f); | |
| }); | |
| // bunch.push(r_fut.into().map_err(|_| ())); | |
| println!("futures {:?}", bunch.len()); | |
| core.run(bunch.into_future()); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment