Skip to content

Instantly share code, notes, and snippets.

@flip111
Created March 22, 2020 18:19
Show Gist options
  • Select an option

  • Save flip111/697e6fa000dba286368c1cd95931de9d to your computer and use it in GitHub Desktop.

Select an option

Save flip111/697e6fa000dba286368c1cd95931de9d to your computer and use it in GitHub Desktop.

Revisions

  1. flip111 created this gist Mar 22, 2020.
    147 changes: 147 additions & 0 deletions main.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,147 @@
    // Combination of:
    // https://github.com/actix/examples/tree/master/juniper
    // https://github.com/actix/examples/tree/master/websocket

    mod schema;
    use crate::schema::{create_schema, Schema};

    use actix::prelude::*;
    use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer};
    use actix_web_actors::ws;
    use bytes::{BytesMut};
    use juniper::{Variables};
    use std::sync::Arc;
    use std::time::{Duration, Instant};


    /// How often heartbeat pings are sent
    const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
    /// How long before lack of client response causes a timeout
    const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

    /// do websocket handshake and start `MyWebSocket` actor
    async fn ws_index(r: HttpRequest, stream: web::Payload, graphql_schema: web::Data<Arc<Schema>>) -> Result<HttpResponse, Error> {
    println!("{:?}", r);
    let res = ws::start(MyWebSocket::new(graphql_schema), &r, stream);
    println!("{:?}", res);
    res
    }

    /// websocket connection is long running connection, it easier
    /// to handle with an actor
    struct MyWebSocket {
    /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
    /// otherwise we drop connection.
    hb: Instant,
    graphql_schema: web::Data<Arc<Schema>>
    }


    impl MyWebSocket {
    fn new(graphql_schema: web::Data<Arc<Schema>>) -> Self {
    Self {
    hb: Instant::now(),
    graphql_schema: graphql_schema
    }
    }

    /// helper method that sends ping to client every second.
    ///
    /// also this method checks heartbeats from client
    fn hb(&self, ctx: &mut <Self as Actor>::Context) {
    ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
    // check client heartbeats
    if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
    // heartbeat timed out
    println!("Websocket Client heartbeat failed, disconnecting!");

    // stop actor
    ctx.stop();

    // don't try to send a ping
    return;
    }

    ctx.ping(b"");
    });
    }
    }

    impl Actor for MyWebSocket {
    type Context = ws::WebsocketContext<Self>;

    /// Method is called on actor start. We start the heartbeat process here.
    fn started(&mut self, ctx: &mut Self::Context) {
    self.hb(ctx);
    }
    }

    /// Handler for `ws::Message`
    impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket {
    fn handle(
    &mut self,
    msg: Result<ws::Message, ws::ProtocolError>,
    ctx: &mut Self::Context,
    ) {
    // process websocket messages
    println!("WS: {:?}", msg);
    match msg {
    Ok(ws::Message::Ping(msg)) => {
    self.hb = Instant::now();
    ctx.pong(&msg);
    }
    Ok(ws::Message::Pong(_)) => {
    self.hb = Instant::now();
    }
    Ok(ws::Message::Text(text)) => {
    let (res, _errors) = juniper::execute(
    &text,
    None,
    &self.graphql_schema,
    &Variables::new(),
    &(),
    ).unwrap();

    let user = serde_json::to_string(&res);

    // match user
    ctx.text("text: ".to_string() + &user.unwrap_or("json decode error".to_string()))
    }
    Ok(ws::Message::Binary(bin)) => {
    let mut bytes = BytesMut::new();
    bytes.extend_from_slice(b"binary: ");
    bytes.extend_from_slice(&bin);
    ctx.binary(bytes)
    }
    Ok(ws::Message::Close(_)) => {
    ctx.stop();
    }
    _ => ctx.stop(),
    }
    }
    }

    #[actix_rt::main]
    async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info");
    env_logger::init();

    // Create Juniper schema
    let schema = std::sync::Arc::new(create_schema());

    HttpServer::new(move || {
    App::new()
    // ReaderT pattern for context ..
    .data(schema.clone())
    // enable logger
    .wrap(middleware::Logger::default())
    // websocket route
    .service(web::resource("/ws/").route(web::get().to(ws_index)))
    // static files
    // .service(fs::Files::new("/", "static/").index_file("index.html"))
    })
    // start http server on 127.0.0.1:8080
    .bind("127.0.0.1:8080")?
    .run()
    .await
    }