Forked from isaacabraham/1 - StorageQueueMailboxProcessor.fs
Last active
August 29, 2015 14:11
-
-
Save jeremiahredekop/e64fa14956d37130f4c9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| open Microsoft.WindowsAzure.Storage | |
| open Microsoft.WindowsAzure.Storage.Queue | |
| open Newtonsoft.Json | |
| open System | |
| /// Code to bind mailbox processors to azure storage queues. | |
| module AzureMailboxProcessor = | |
| /// Gets a handle to an Azure storage queue | |
| let private getQueue (connectionString, queueName) = | |
| let connection = CloudStorageAccount.Parse connectionString | |
| let queueClient = connection.CreateCloudQueueClient() | |
| queueClient.GetQueueReference queueName | |
| /// Creates a mailbox processor that writes to an Azure storage queue | |
| let createQueueWriter<'a> (connectionString, queueName) = | |
| let queue = getQueue(connectionString, queueName) | |
| new MailboxProcessor<'a>(fun mb -> async { | |
| while true do | |
| let! message = mb.Receive() | |
| let message = CloudQueueMessage(message |> JsonConvert.SerializeObject) | |
| do! (queue.AddMessageAsync(message) |> Async.AwaitIAsyncResult |> Async.Ignore) }) | |
| [<Measure>] | |
| type Seconds | |
| /// Binds a MailboxProcessor to an Azure storage queue. | |
| let bindToQueue<'a> (connectionString, queueName) pollTime (agent:MailboxProcessor<'a>) = | |
| async { | |
| let queue = getQueue(connectionString, queueName) | |
| let getMessage() = queue.GetMessageAsync() |> Async.AwaitTask | |
| while true do | |
| let! message = getMessage() | |
| match message with | |
| | null -> printfn "no message!" | |
| do! Async.Sleep((pollTime / 1<Seconds>) * 1000) | |
| | message -> | |
| JsonConvert.DeserializeObject<'a>(message.AsString) | |
| |> agent.Post | |
| } |> Async.Start | |
| /// Here's a DTO of ours - shared between reader and writer | |
| type Person = | |
| { Name : string | |
| Age : int } | |
| /// Reader - listens for person messages and print out the details... | |
| let subscriber = new MailboxProcessor<Person>(fun mb -> async { | |
| while true do | |
| let! message = mb.Receive() | |
| printfn "Person %s is %d years old." message.Name message.Age | |
| }) | |
| subscriber.Start() | |
| /// Magic to bind the agent to a storage queue | |
| subscriber | |
| |> AzureMailboxProcessor.bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<AzureMailboxProcessor.Seconds> | |
| /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine) | |
| let personWriter = AzureMailboxProcessor.createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue") | |
| personWriter.Start() | |
| /// Start posting messages! | |
| personWriter.Post { Name = "Isaac"; Age = 34 } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment