Forked from isaacabraham/1 - StorageQueueMailboxProcessor.fs
Last active
August 29, 2015 14:11
-
-
Save jeremiahredekop/e64fa14956d37130f4c9 to your computer and use it in GitHub Desktop.
Revisions
-
isaacabraham revised this gist
Sep 8, 2014 . 1 changed file with 7 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,7 +3,6 @@ #r @"WindowsAzure.Storage.4.1.0\lib\net40\Microsoft.WindowsAzure.Storage.dll" #load "StorageQueueAgent.fs" type Person = { Name : string; Age : int } open AzureMailboxProcessor @@ -17,24 +16,26 @@ let subscriber = new AzureStorageQueueAgent<Person>(fun mb -> let rec loop() = async { let! person, channel = mb.Receive() // process the message and get the result let status = match person with | { Name = "Isaac"; Age = _ } | { Name = "Richard"; Age = _ } -> Completed | { Name = "Andy"; Age = _ } -> Update { person with Name = person.Name + "xyz" } | _ -> Failed // reply with the result - gets adapted by the StorageQueueAgent into the appropriate Azure Storage Queue behaviour channel.Reply status return! loop() } loop()) subscriber.Start() /// Magic to bind the reader to the storage queue subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; LeaseLength = Some <| TimeSpan.FromSeconds 5.; MaxDequeueCount = Some 3 }) /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine) let writer = queueDetails |> createQueueWriter<Person> writer.Start() writer.Post { Name = "Isaac"; Age = 34 } writer.Post { Name = "Andy"; Age = 32 } writer.Post { Name = "Richard"; Age = 39 } writer.Post { Name = "Joe Bloggs"; Age = 35 } -
isaacabraham revised this gist
Sep 8, 2014 . 1 changed file with 16 additions and 15 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,9 +2,15 @@ module AzureMailboxProcessor open System module private Async = let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore module private Option = let fromNullable (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None let toNullable = function | Some value -> Nullable value | None -> Nullable() open Microsoft.WindowsAzure.Storage open Microsoft.WindowsAzure.Storage.Queue open Newtonsoft.Json @@ -30,14 +36,9 @@ module AzureMailboxProcessor | Completed /// The message was not processed successfully and should be returned to the queue for processing again. | Failed /// Replace the original queue message with a new payload. | Update of UpdatedPayload : 'a /// Contains details on the queue subscription. type QueueSubscriptionOptions = { /// How long to wait between polling requests. @@ -47,15 +48,18 @@ module AzureMailboxProcessor /// How many times a message can be dequeued before being permanently removed. MaxDequeueCount : int option } let private completeMessage message (queue:CloudQueue) = queue.DeleteMessageAsync message |> Async.AwaitTaskEmpty /// Represents an F# Agent that can be bound to an Azure storage queue. type AzureStorageQueueAgent<'a> = MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>> /// Binds a MailboxProcessor to an Azure storage queue. let bindToQueue<'a>((connectionString, queueName), options) (agent:AzureStorageQueueAgent<'a>) = let queue = getQueue(connectionString, queueName) async { while true do let! message = queue.GetMessageAsync(options.LeaseLength |> Option.toNullable, null, null) |> Async.AwaitTask match message with | null-> do! Async.Sleep(options.PollTime.TotalMilliseconds |> int) @@ -78,7 +82,4 @@ module AzureMailboxProcessor do! queue |> completeMessage message do! queue.AddMessageAsync(CloudQueueMessage(payload |> JsonConvert.SerializeObject)) |> Async.AwaitTaskEmpty with ex -> printfn "ARGH %s" ex.Message } |> Async.Start -
isaacabraham revised this gist
Sep 8, 2014 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -30,7 +30,7 @@ module AzureMailboxProcessor | Completed /// The message was not processed successfully and should be returned to the queue for processing again. | Failed /// Replace the original message with this new payload. | Update of UpdatedPayload : 'a let toOption (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None -
isaacabraham revised this gist
Sep 8, 2014 . 1 changed file with 4 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -34,7 +34,7 @@ subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; L /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine) let writer = queueDetails |> createQueueWriter<Person> writer.Start() writer.Post { Name = "Isaac"; Age = 34 } // this message will be successfully completed writer.Post { Name = "Andy"; Age = 32 } // this message will be updated as "Andyxyz" writer.Post { Name = "Richard"; Age = 39 } // this message will be successfully completed writer.Post { Name = "Joe Bloggs"; Age = 35 } // this message will fail 3x before being automatically removed from the queue -
isaacabraham revised this gist
Sep 8, 2014 . 2 changed files with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes.File renamed without changes. -
isaacabraham revised this gist
Sep 8, 2014 . 1 changed file with 40 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,40 @@ #I @"..\packages\" #r @"Newtonsoft.Json.6.0.3\lib\net45\Newtonsoft.Json.dll" #r @"WindowsAzure.Storage.4.1.0\lib\net40\Microsoft.WindowsAzure.Storage.dll" #load "StorageQueueAgent.fs" // Create a sample DTO type Person = { Name : string; Age : int } open AzureMailboxProcessor open System // Azure Storage Queue details let queueDetails = ("UseDevelopmentStorage=true", "sample-queue") // Reader let subscriber = new AzureStorageQueueAgent<Person>(fun mb -> let rec loop() = async { let! person, channel = mb.Receive() let status = match person with | { Name = "Isaac"; Age = _ } | { Name = "Richard"; Age = _ } -> Completed | { Name = "Andy"; Age = _ } -> Update { person with Name = person.Name + "xyz" } | _ -> Failed channel.Reply status return! loop() } loop()) /// Magic to bind the reader to the storage queue subscriber.Start() subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; LeaseLength = Some <| TimeSpan.FromSeconds 5.; MaxDequeueCount = Some 3 }) /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine) let writer = queueDetails |> createQueueWriter<Person> writer.Start() writer.Post { Name = "Isaac"; Age = 34 } writer.Post { Name = "Andy"; Age = 32 } writer.Post { Name = "Richard"; Age = 39 } writer.Post { Name = "Joe Bloggs"; Age = 35 } -
isaacabraham revised this gist
Sep 8, 2014 . 1 changed file with 58 additions and 45 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,13 +1,14 @@ /// Code to bind mailbox processors to azure storage queues. module AzureMailboxProcessor open System module Async = let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore open Microsoft.WindowsAzure.Storage open Microsoft.WindowsAzure.Storage.Queue open Newtonsoft.Json /// Gets a handle to an Azure storage queue let private getQueue (connectionString, queueName) = let connection = CloudStorageAccount.Parse connectionString @@ -23,49 +24,61 @@ module AzureMailboxProcessor = let message = CloudQueueMessage(message |> JsonConvert.SerializeObject) do! queue.AddMessageAsync(message) |> Async.AwaitTaskEmpty }) // The different completion statuses a message can have. type MessageProcessedStatus<'a> = /// The message successfully completed. | Completed /// The message was not processed successfully and should be returned to the queue for processing again. | Failed /// Replace this method with a new payload. | Update of UpdatedPayload : 'a let toOption (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None let toNullable = function | Some value -> Nullable value | None -> Nullable() /// Contains details on the queue subscription. type QueueSubscriptionOptions = { /// How long to wait between polling requests. PollTime : TimeSpan /// The lease time for a queue message. LeaseLength : TimeSpan option /// How many times a message can be dequeued before being permanently removed. MaxDequeueCount : int option } let completeMessage message (queue:CloudQueue) = queue.DeleteMessageAsync message |> Async.AwaitTaskEmpty /// Binds a MailboxProcessor to an Azure storage queue. let bindToQueue<'a>((connectionString, queueName), options) (agent:MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>>) = let queue = getQueue(connectionString, queueName) async { while true do let! message = queue.GetMessageAsync(options.LeaseLength |> toNullable, null, null) |> Async.AwaitTask match message with | null-> do! Async.Sleep(options.PollTime.TotalMilliseconds |> int) | message -> match message.DequeueCount, options.MaxDequeueCount with | count, Some limit when count >= limit -> // Message dequeue count exceeded, just delete the message do! queue |> completeMessage message | _ -> try let timeout = match options.LeaseLength with | Some lifetime -> lifetime.TotalMilliseconds |> int | None -> Threading.Timeout.Infinite let status = agent.TryPostAndReply((fun ch -> JsonConvert.DeserializeObject<'a>(message.AsString), ch), timeout) match status with | None // there was no reply from the agent within specified time; lease has expired. | Some Failed -> () // message has failed to process; do not complete the message. | Some Completed -> do! queue |> completeMessage message | Some (Update payload) -> do! queue |> completeMessage message do! queue.AddMessageAsync(CloudQueueMessage(payload |> JsonConvert.SerializeObject)) |> Async.AwaitTaskEmpty with ex -> printfn "ARGH %s" ex.Message } |> Async.Start /// Represents an F# Agent that can be bound to an Azure storage queue. type AzureStorageQueueAgent<'a> = MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>> -
isaacabraham revised this gist
Jul 20, 2014 . 1 changed file with 10 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -5,6 +5,9 @@ open System /// Code to bind mailbox processors to azure storage queues. module AzureMailboxProcessor = module Async = let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore /// Gets a handle to an Azure storage queue let private getQueue (connectionString, queueName) = let connection = CloudStorageAccount.Parse connectionString @@ -18,7 +21,7 @@ module AzureMailboxProcessor = while true do let! message = mb.Receive() let message = CloudQueueMessage(message |> JsonConvert.SerializeObject) do! queue.AddMessageAsync(message) |> Async.AwaitTaskEmpty }) [<Measure>] type Seconds @@ -31,11 +34,13 @@ module AzureMailboxProcessor = while true do let! message = getMessage() match message with | null -> printfn "no message, sleeping for %A seconds..." pollTime do! Async.Sleep((pollTime / 1<Seconds>) * 1000) | message -> printfn "got a message, deserializing and posting..." JsonConvert.DeserializeObject<'a>(message.AsString) |> agent.Post do! queue.DeleteMessageAsync(message) |> Async.AwaitTaskEmpty } |> Async.Start /// Here's a DTO of ours - shared between reader and writer -
isaacabraham revised this gist
Jul 20, 2014 . 1 changed file with 5 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -51,14 +51,16 @@ let subscriber = new MailboxProcessor<Person>(fun mb -> async { }) subscriber.Start() open AzureMailboxProcessor /// Magic to bind the agent to a storage queue subscriber |> bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<Seconds> /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine) let personWriter = createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue") personWriter.Start() /// Start posting messages! personWriter.Post { Name = "Isaac"; Age = 34 } -
isaacabraham created this gist
Jul 20, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,64 @@ open Microsoft.WindowsAzure.Storage open Microsoft.WindowsAzure.Storage.Queue open Newtonsoft.Json open System /// Code to bind mailbox processors to azure storage queues. module AzureMailboxProcessor = /// Gets a handle to an Azure storage queue let private getQueue (connectionString, queueName) = let connection = CloudStorageAccount.Parse connectionString let queueClient = connection.CreateCloudQueueClient() queueClient.GetQueueReference queueName /// Creates a mailbox processor that writes to an Azure storage queue let createQueueWriter<'a> (connectionString, queueName) = let queue = getQueue(connectionString, queueName) new MailboxProcessor<'a>(fun mb -> async { while true do let! message = mb.Receive() let message = CloudQueueMessage(message |> JsonConvert.SerializeObject) do! (queue.AddMessageAsync(message) |> Async.AwaitIAsyncResult |> Async.Ignore) }) [<Measure>] type Seconds /// Binds a MailboxProcessor to an Azure storage queue. let bindToQueue<'a> (connectionString, queueName) pollTime (agent:MailboxProcessor<'a>) = async { let queue = getQueue(connectionString, queueName) let getMessage() = queue.GetMessageAsync() |> Async.AwaitTask while true do let! message = getMessage() match message with | null -> printfn "no message!" do! Async.Sleep((pollTime / 1<Seconds>) * 1000) | message -> JsonConvert.DeserializeObject<'a>(message.AsString) |> agent.Post } |> Async.Start /// Here's a DTO of ours - shared between reader and writer type Person = { Name : string Age : int } /// Reader - listens for person messages and print out the details... let subscriber = new MailboxProcessor<Person>(fun mb -> async { while true do let! message = mb.Receive() printfn "Person %s is %d years old." message.Name message.Age }) subscriber.Start() /// Magic to bind the agent to a storage queue subscriber |> AzureMailboxProcessor.bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<AzureMailboxProcessor.Seconds> /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine) let personWriter = AzureMailboxProcessor.createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue") personWriter.Start() /// Start posting messages! personWriter.Post { Name = "Isaac"; Age = 34 }