Skip to content

Instantly share code, notes, and snippets.

@drahnr
Created January 20, 2018 22:41
Show Gist options
  • Select an option

  • Save drahnr/f455dbeabb4ebcea22f05c41795fb85a to your computer and use it in GitHub Desktop.

Select an option

Save drahnr/f455dbeabb4ebcea22f05c41795fb85a to your computer and use it in GitHub Desktop.
#[macro_use]
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::{Future, Stream};
use tokio_io::{io, AsyncRead};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_core::reactor::Timeout;
use std::time::Duration;
use futures::future;
use futures::{Sink,AsyncSink};
use futures::{Async,Poll};
use futures::sync::mpsc::{UnboundedSender,UnboundedReceiver,unbounded};
use futures::stream::FuturesUnordered;
type MqttWriter = UnboundedSender<u64>;
type MqttReader = UnboundedReceiver<u64>;
#[derive(Debug)]
enum State {
Init,
Processing,
}
// #[derive(Debug)]
struct Sinker {
state : State,
writer : MqttWriter,
value : u64,
}
impl Sinker {
pub fn new(value : u64, writer : MqttWriter) -> Self {
Self { state : State::Init, writer, value }
}
}
impl Future for Sinker {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.state {
State::Init => {
println!("Init");
match self.writer.start_send(self.value) {
Ok(AsyncSink::Ready) => {
self.state = State::Processing;
println!("Sink Ready");
},
Ok(AsyncSink::NotReady(_)) => {
println!("Sink - Not ready");
return Ok(Async::NotReady)
},
Err(e) => {
println!("Sink - Err {:?}", e);
return Err(());
}
}
},
State::Processing => {
let x = self.writer.poll_complete().map_err(|_| () );
println!("Processing {:?}", x);
if let Ok(x) = x {
match x {
Async::Ready(_) => return Ok(Async::Ready(())),
Async::NotReady => return Ok(Async::NotReady),
_ => unreachable!(),
}
} else {
println!("Error on poll_complete");
return Err(());
}
},
_ => unimplemented!()
}
}
}
}
fn main() {
// Create the event loop that will drive this server
let mut core = Core::new().unwrap();
let handle = core.handle();
let x : Vec<u64> = vec![1,2,3,4,5,6,7,8,9];
let mut bunch = FuturesUnordered::<_>::new();
let (mut writer, mut reader) = futures::sync::mpsc::unbounded::<u64>();
// let r_fut = reader.for_each(|x| { println!("{:?}", x); future::ok(x) }).map(|_| ()).map_err(|_| ());
x.into_iter().for_each(|item| {
let mut writer = writer.clone();
let mut item = item.clone();
println!("{:?}", item);
let f = Timeout::new(Duration::new(1 + item,0), &handle).unwrap()
.map_err(|_| ())
.and_then(move |_| { Sinker::new(item, writer ) } )
.and_then(|x| { println!("item is done"); Ok(()) } );
bunch.push(f);
});
// bunch.push(r_fut.into().map_err(|_| ()));
println!("futures {:?}", bunch.len());
core.run(bunch.into_future());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment