Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jeremiahredekop/e64fa14956d37130f4c9 to your computer and use it in GitHub Desktop.

Select an option

Save jeremiahredekop/e64fa14956d37130f4c9 to your computer and use it in GitHub Desktop.
open Microsoft.WindowsAzure.Storage
open Microsoft.WindowsAzure.Storage.Queue
open Newtonsoft.Json
open System
/// Code to bind mailbox processors to azure storage queues.
module AzureMailboxProcessor =
module Async =
let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore
/// Gets a handle to an Azure storage queue
let private getQueue (connectionString, queueName) =
let connection = CloudStorageAccount.Parse connectionString
let queueClient = connection.CreateCloudQueueClient()
queueClient.GetQueueReference queueName
/// Creates a mailbox processor that writes to an Azure storage queue
let createQueueWriter<'a> (connectionString, queueName) =
let queue = getQueue(connectionString, queueName)
new MailboxProcessor<'a>(fun mb -> async {
while true do
let! message = mb.Receive()
let message = CloudQueueMessage(message |> JsonConvert.SerializeObject)
do! queue.AddMessageAsync(message) |> Async.AwaitTaskEmpty })
[<Measure>]
type Seconds
/// Binds a MailboxProcessor to an Azure storage queue.
let bindToQueue<'a> (connectionString, queueName) pollTime (agent:MailboxProcessor<'a>) =
async {
let queue = getQueue(connectionString, queueName)
let getMessage() = queue.GetMessageAsync() |> Async.AwaitTask
while true do
let! message = getMessage()
match message with
| null ->
printfn "no message, sleeping for %A seconds..." pollTime
do! Async.Sleep((pollTime / 1<Seconds>) * 1000)
| message ->
printfn "got a message, deserializing and posting..."
JsonConvert.DeserializeObject<'a>(message.AsString) |> agent.Post
do! queue.DeleteMessageAsync(message) |> Async.AwaitTaskEmpty
} |> Async.Start
/// Here's a DTO of ours - shared between reader and writer
type Person =
{ Name : string
Age : int }
/// Reader - listens for person messages and print out the details...
let subscriber = new MailboxProcessor<Person>(fun mb -> async {
while true do
let! message = mb.Receive()
printfn "Person %s is %d years old." message.Name message.Age
})
subscriber.Start()
open AzureMailboxProcessor
/// Magic to bind the agent to a storage queue
subscriber
|> bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<Seconds>
/// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
let personWriter = createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue")
personWriter.Start()
/// Start posting messages!
personWriter.Post { Name = "Isaac"; Age = 34 }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment