Skip to content

Instantly share code, notes, and snippets.

@c-spencer
Created July 26, 2019 09:08
Show Gist options
  • Select an option

  • Save c-spencer/4b5312411c7d0a0d07c4867b78df54d1 to your computer and use it in GitHub Desktop.

Select an option

Save c-spencer/4b5312411c7d0a0d07c4867b78df54d1 to your computer and use it in GitHub Desktop.

Revisions

  1. c-spencer created this gist Jul 26, 2019.
    60 changes: 60 additions & 0 deletions access_token.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,60 @@
    pub fn access_token() -> BoxedFilter<(String,)> {
    let (set_token_value, token_value) =
    tokio_sync::watch::channel::<Option<gcloud::GrantedToken>>(None);
    let channel_lock = tokio_sync::lock::Lock::new(set_token_value);

    warp::any()
    .and_then(move || {
    let current_value = token_value.get_ref().clone().filter(|t| t.is_valid());

    let result = match current_value {
    Some(token) => Either::A(ok(token.into_token())),
    None => {
    log::info!("Token missing value.");

    // Clone channels for inner closures.
    let inner_token_value = token_value.clone();
    let mut lock = channel_lock.clone();

    Either::B(
    // Return a future to a future of the token value.
    // Nesting the futures seems necessary, or at least cleanest, in order
    // to have the guard on the lock hold while that future executes.
    poll_fn(move || match lock.poll_lock() {
    // Acquire a lock on the channel
    futures::Async::Ready(mut guard) => {
    let cv = inner_token_value.get_ref().clone().filter(|t| t.is_valid());

    Ok(Async::Ready(match cv {
    // If the token already exists, just return it.
    Some(token) => {
    log::info!("Token had value when lock acquired");
    Either::A(ok(token.into_token()))
    }

    // Otherwise run get_token, send a copy of that token into the channel,
    // and return the new token value.
    _ => Either::B(gcloud::get_access_token().map_err(log_error).and_then(
    move |token| {
    // Put the value onto the channel.
    (*guard).broadcast(Some(token.clone())).map_err(log_error)?;

    log::info!("Completed token refresh.");
    Ok(token.into_token())
    },
    )),
    }))
    }

    Async::NotReady => Ok(Async::NotReady),
    })
    // Unwrap the inner future
    .and_then(|v| v),
    )
    }
    };

    result.map_err(warp::reject::custom)
    })
    .boxed()
    }