Created
March 22, 2020 18:19
-
-
Save flip111/697e6fa000dba286368c1cd95931de9d to your computer and use it in GitHub Desktop.
Revisions
-
flip111 created this gist
Mar 22, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,147 @@ // Combination of: // https://github.com/actix/examples/tree/master/juniper // https://github.com/actix/examples/tree/master/websocket mod schema; use crate::schema::{create_schema, Schema}; use actix::prelude::*; use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web_actors::ws; use bytes::{BytesMut}; use juniper::{Variables}; use std::sync::Arc; use std::time::{Duration, Instant}; /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); /// How long before lack of client response causes a timeout const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// do websocket handshake and start `MyWebSocket` actor async fn ws_index(r: HttpRequest, stream: web::Payload, graphql_schema: web::Data<Arc<Schema>>) -> Result<HttpResponse, Error> { println!("{:?}", r); let res = ws::start(MyWebSocket::new(graphql_schema), &r, stream); println!("{:?}", res); res } /// websocket connection is long running connection, it easier /// to handle with an actor struct MyWebSocket { /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), /// otherwise we drop connection. hb: Instant, graphql_schema: web::Data<Arc<Schema>> } impl MyWebSocket { fn new(graphql_schema: web::Data<Arc<Schema>>) -> Self { Self { hb: Instant::now(), graphql_schema: graphql_schema } } /// helper method that sends ping to client every second. /// /// also this method checks heartbeats from client fn hb(&self, ctx: &mut <Self as Actor>::Context) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { // check client heartbeats if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { // heartbeat timed out println!("Websocket Client heartbeat failed, disconnecting!"); // stop actor ctx.stop(); // don't try to send a ping return; } ctx.ping(b""); }); } } impl Actor for MyWebSocket { type Context = ws::WebsocketContext<Self>; /// Method is called on actor start. We start the heartbeat process here. fn started(&mut self, ctx: &mut Self::Context) { self.hb(ctx); } } /// Handler for `ws::Message` impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket { fn handle( &mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context, ) { // process websocket messages println!("WS: {:?}", msg); match msg { Ok(ws::Message::Ping(msg)) => { self.hb = Instant::now(); ctx.pong(&msg); } Ok(ws::Message::Pong(_)) => { self.hb = Instant::now(); } Ok(ws::Message::Text(text)) => { let (res, _errors) = juniper::execute( &text, None, &self.graphql_schema, &Variables::new(), &(), ).unwrap(); let user = serde_json::to_string(&res); // match user ctx.text("text: ".to_string() + &user.unwrap_or("json decode error".to_string())) } Ok(ws::Message::Binary(bin)) => { let mut bytes = BytesMut::new(); bytes.extend_from_slice(b"binary: "); bytes.extend_from_slice(&bin); ctx.binary(bytes) } Ok(ws::Message::Close(_)) => { ctx.stop(); } _ => ctx.stop(), } } } #[actix_rt::main] async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info"); env_logger::init(); // Create Juniper schema let schema = std::sync::Arc::new(create_schema()); HttpServer::new(move || { App::new() // ReaderT pattern for context .. .data(schema.clone()) // enable logger .wrap(middleware::Logger::default()) // websocket route .service(web::resource("/ws/").route(web::get().to(ws_index))) // static files // .service(fs::Files::new("/", "static/").index_file("index.html")) }) // start http server on 127.0.0.1:8080 .bind("127.0.0.1:8080")? .run() .await }