Skip to content

Instantly share code, notes, and snippets.

@WhiteGrouse
Last active August 4, 2020 19:43
Show Gist options
  • Select an option

  • Save WhiteGrouse/cbb5b6af8be6690762f45949b41025f7 to your computer and use it in GitHub Desktop.

Select an option

Save WhiteGrouse/cbb5b6af8be6690762f45949b41025f7 to your computer and use it in GitHub Desktop.
use async_std::sync::{Arc, Weak, Mutex};
use async_std::task::{self, JoinHandle};
use futures::future::{abortable, Aborted, AbortHandle};
use std::time::Duration;
fn main() {
task::block_on(async {
let _peer = Peer::new();
});
}
struct Peer {
service_socket: Arc<SocketService>,
service_ping: Arc<PingService>,
service_open_connection: Arc<OpenConnectionService>,
}
impl Peer {
async fn new() -> Peer {
let service_socket = Arc::new(SocketService::new());
let service_ping = Arc::new(PingService::new());
let service_open_connection = Arc::new(OpenConnectionService::new());
service_socket.set_services(Arc::downgrade(&service_ping), Arc::downgrade(&service_open_connection)).await;
service_ping.set_services(Arc::downgrade(&service_socket)).await;
service_open_connection.set_services(Arc::downgrade(&service_socket), Arc::downgrade(&service_ping)).await;
Peer {
service_socket,
service_ping,
service_open_connection,
}
}
}
struct SocketService {
inner: Arc<SocketServiceInner>,
task: Mutex<Option<(JoinHandle<Result<(), Aborted>>, AbortHandle)>>,
}
impl SocketService {
fn new() -> SocketService {
let inner = Arc::new(SocketServiceInner::new());
SocketService {
inner: inner,
task: Mutex::new(None),
}
}
async fn set_services(&self, ping: Weak<PingService>, open_connection: Weak<OpenConnectionService>) {
if let Some(mut services) = self.inner.services.try_lock() {
*services = (ping, open_connection);
}
else {
panic!("The service cannot be set because the task has already been started.");
}
let (ft, abort_handle) = abortable(self.inner.clone().run());
let mut task = self.task.lock().await;
let _ = task.replace((task::spawn(ft), abort_handle));
}
}
impl Drop for SocketService {
fn drop(&mut self) {
let mut task = task::block_on(self.task.lock());
if let Some((join, abort)) = task.take() {
abort.abort();
let _ = task::block_on(join);
}
}
}
struct SocketServiceInner {
services: Mutex<(Weak<PingService>, Weak<OpenConnectionService>)>,
}
impl SocketServiceInner {
fn new() -> SocketServiceInner {
SocketServiceInner {
services: Mutex::new((Weak::new(), Weak::new())),
}
}
async fn run(self: Arc<SocketServiceInner>) {
let services = self.services.lock().await;
loop {
task::sleep(Duration::from_secs(1)).await;
}
}
}
struct PingService {
inner: Arc<PingServiceInner>,
task: Mutex<Option<(JoinHandle<Result<(), Aborted>>, AbortHandle)>>,
}
impl PingService {
fn new() -> PingService {
let inner = Arc::new(PingServiceInner::new());
PingService {
inner: inner,
task: Mutex::new(None),
}
}
async fn set_services(&self, socket: Weak<SocketService>) {
if let Some(mut service_socket) = self.inner.service_socket.try_lock() {
*service_socket = socket;
}
else {
panic!("The service cannot be set because the task has already been started.");
}
let (ft, abort_handle) = abortable(self.inner.clone().run());
let mut task = self.task.lock().await;
let _ = task.replace((task::spawn(ft), abort_handle));
}
}
impl Drop for PingService {
fn drop(&mut self) {
let mut task = task::block_on(self.task.lock());
if let Some((join, abort)) = task.take() {
abort.abort();
let _ = task::block_on(join);
}
}
}
struct PingServiceInner {
service_socket: Mutex<Weak<SocketService>>,
}
impl PingServiceInner {
fn new() -> PingServiceInner {
PingServiceInner {
service_socket: Mutex::new(Weak::new()),
}
}
async fn run(self: Arc<PingServiceInner>) {
let service_socket = self.service_socket.lock().await;
loop {
task::sleep(Duration::from_secs(1)).await;
}
}
}
struct OpenConnectionService {
inner: Arc<OpenConnectionServiceInner>,
task: Mutex<Option<(JoinHandle<Result<(), Aborted>>, AbortHandle)>>,
}
impl OpenConnectionService {
fn new() -> OpenConnectionService {
let inner = Arc::new(OpenConnectionServiceInner::new());
OpenConnectionService {
inner: inner,
task: Mutex::new(None),
}
}
async fn set_services(&self, socket: Weak<SocketService>, ping: Weak<PingService>) {
if let Some(mut services) = self.inner.services.try_lock() {
*services = (socket, ping);
}
else {
panic!("The service cannot be set because the task has already been started.");
}
let (ft, abort_handle) = abortable(self.inner.clone().run());
let mut task = self.task.lock().await;
let _ = task.replace((task::spawn(ft), abort_handle));
}
}
impl Drop for OpenConnectionService {
fn drop(&mut self) {
let mut task = task::block_on(self.task.lock());
if let Some((join, abort)) = task.take() {
abort.abort();
let _ = task::block_on(join);
}
}
}
struct OpenConnectionServiceInner {
services: Mutex<(Weak<SocketService>, Weak<PingService>)>,
}
impl OpenConnectionServiceInner {
fn new() -> OpenConnectionServiceInner {
OpenConnectionServiceInner {
services: Mutex::new((Weak::new(), Weak::new())),
}
}
async fn run(self: Arc<OpenConnectionServiceInner>) {
let services = self.services.lock().await;
loop {
task::sleep(Duration::from_secs(1)).await;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment