Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jeremiahredekop/e64fa14956d37130f4c9 to your computer and use it in GitHub Desktop.

Select an option

Save jeremiahredekop/e64fa14956d37130f4c9 to your computer and use it in GitHub Desktop.

Revisions

  1. @isaacabraham isaacabraham revised this gist Sep 8, 2014. 1 changed file with 7 additions and 6 deletions.
    13 changes: 7 additions & 6 deletions 2 - Sample.fs
    Original file line number Diff line number Diff line change
    @@ -3,7 +3,6 @@
    #r @"WindowsAzure.Storage.4.1.0\lib\net40\Microsoft.WindowsAzure.Storage.dll"
    #load "StorageQueueAgent.fs"

    // Create a sample DTO
    type Person = { Name : string; Age : int }

    open AzureMailboxProcessor
    @@ -17,24 +16,26 @@ let subscriber = new AzureStorageQueueAgent<Person>(fun mb ->
    let rec loop() =
    async {
    let! person, channel = mb.Receive()
    // process the message and get the result
    let status = match person with
    | { Name = "Isaac"; Age = _ }
    | { Name = "Richard"; Age = _ } -> Completed
    | { Name = "Andy"; Age = _ } -> Update { person with Name = person.Name + "xyz" }
    | _ -> Failed
    // reply with the result - gets adapted by the StorageQueueAgent into the appropriate Azure Storage Queue behaviour
    channel.Reply status
    return! loop()
    }
    loop())
    subscriber.Start()

    /// Magic to bind the reader to the storage queue
    subscriber.Start()
    subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; LeaseLength = Some <| TimeSpan.FromSeconds 5.; MaxDequeueCount = Some 3 })

    /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
    let writer = queueDetails |> createQueueWriter<Person>
    writer.Start()
    writer.Post { Name = "Isaac"; Age = 34 } // this message will be successfully completed
    writer.Post { Name = "Andy"; Age = 32 } // this message will be updated as "Andyxyz"
    writer.Post { Name = "Richard"; Age = 39 } // this message will be successfully completed
    writer.Post { Name = "Joe Bloggs"; Age = 35 } // this message will fail 3x before being automatically removed from the queue
    writer.Post { Name = "Isaac"; Age = 34 }
    writer.Post { Name = "Andy"; Age = 32 }
    writer.Post { Name = "Richard"; Age = 39 }
    writer.Post { Name = "Joe Bloggs"; Age = 35 }
  2. @isaacabraham isaacabraham revised this gist Sep 8, 2014. 1 changed file with 16 additions and 15 deletions.
    31 changes: 16 additions & 15 deletions 1 - StorageQueueMailboxProcessor.fs
    Original file line number Diff line number Diff line change
    @@ -2,9 +2,15 @@
    module AzureMailboxProcessor
    open System

    module Async =
    module private Async =
    let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore

    module private Option =
    let fromNullable (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None
    let toNullable = function
    | Some value -> Nullable value
    | None -> Nullable()

    open Microsoft.WindowsAzure.Storage
    open Microsoft.WindowsAzure.Storage.Queue
    open Newtonsoft.Json
    @@ -30,14 +36,9 @@ module AzureMailboxProcessor
    | Completed
    /// The message was not processed successfully and should be returned to the queue for processing again.
    | Failed
    /// Replace the original message with this new payload.
    /// Replace the original queue message with a new payload.
    | Update of UpdatedPayload : 'a

    let toOption (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None
    let toNullable = function
    | Some value -> Nullable value
    | None -> Nullable()

    /// Contains details on the queue subscription.
    type QueueSubscriptionOptions =
    { /// How long to wait between polling requests.
    @@ -47,15 +48,18 @@ module AzureMailboxProcessor
    /// How many times a message can be dequeued before being permanently removed.
    MaxDequeueCount : int option }

    let completeMessage message (queue:CloudQueue) =
    queue.DeleteMessageAsync message |> Async.AwaitTaskEmpty
    let private completeMessage message (queue:CloudQueue) =
    queue.DeleteMessageAsync message |> Async.AwaitTaskEmpty

    /// Represents an F# Agent that can be bound to an Azure storage queue.
    type AzureStorageQueueAgent<'a> = MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>>

    /// Binds a MailboxProcessor to an Azure storage queue.
    let bindToQueue<'a>((connectionString, queueName), options) (agent:MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>>) =
    let bindToQueue<'a>((connectionString, queueName), options) (agent:AzureStorageQueueAgent<'a>) =
    let queue = getQueue(connectionString, queueName)
    async {
    while true do
    let! message = queue.GetMessageAsync(options.LeaseLength |> toNullable, null, null) |> Async.AwaitTask
    let! message = queue.GetMessageAsync(options.LeaseLength |> Option.toNullable, null, null) |> Async.AwaitTask
    match message with
    | null->
    do! Async.Sleep(options.PollTime.TotalMilliseconds |> int)
    @@ -78,7 +82,4 @@ module AzureMailboxProcessor
    do! queue |> completeMessage message
    do! queue.AddMessageAsync(CloudQueueMessage(payload |> JsonConvert.SerializeObject)) |> Async.AwaitTaskEmpty
    with ex -> printfn "ARGH %s" ex.Message
    } |> Async.Start

    /// Represents an F# Agent that can be bound to an Azure storage queue.
    type AzureStorageQueueAgent<'a> = MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>>
    } |> Async.Start
  3. @isaacabraham isaacabraham revised this gist Sep 8, 2014. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion 1 - StorageQueueMailboxProcessor.fs
    Original file line number Diff line number Diff line change
    @@ -30,7 +30,7 @@ module AzureMailboxProcessor
    | Completed
    /// The message was not processed successfully and should be returned to the queue for processing again.
    | Failed
    /// Replace this method with a new payload.
    /// Replace the original message with this new payload.
    | Update of UpdatedPayload : 'a

    let toOption (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None
  4. @isaacabraham isaacabraham revised this gist Sep 8, 2014. 1 changed file with 4 additions and 4 deletions.
    8 changes: 4 additions & 4 deletions 2 - Sample.fs
    Original file line number Diff line number Diff line change
    @@ -34,7 +34,7 @@ subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; L
    /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
    let writer = queueDetails |> createQueueWriter<Person>
    writer.Start()
    writer.Post { Name = "Isaac"; Age = 34 }
    writer.Post { Name = "Andy"; Age = 32 }
    writer.Post { Name = "Richard"; Age = 39 }
    writer.Post { Name = "Joe Bloggs"; Age = 35 }
    writer.Post { Name = "Isaac"; Age = 34 } // this message will be successfully completed
    writer.Post { Name = "Andy"; Age = 32 } // this message will be updated as "Andyxyz"
    writer.Post { Name = "Richard"; Age = 39 } // this message will be successfully completed
    writer.Post { Name = "Joe Bloggs"; Age = 35 } // this message will fail 3x before being automatically removed from the queue
  5. @isaacabraham isaacabraham revised this gist Sep 8, 2014. 2 changed files with 0 additions and 0 deletions.
    File renamed without changes.
  6. @isaacabraham isaacabraham revised this gist Sep 8, 2014. 1 changed file with 40 additions and 0 deletions.
    40 changes: 40 additions & 0 deletions Sample.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    #I @"..\packages\"
    #r @"Newtonsoft.Json.6.0.3\lib\net45\Newtonsoft.Json.dll"
    #r @"WindowsAzure.Storage.4.1.0\lib\net40\Microsoft.WindowsAzure.Storage.dll"
    #load "StorageQueueAgent.fs"

    // Create a sample DTO
    type Person = { Name : string; Age : int }

    open AzureMailboxProcessor
    open System

    // Azure Storage Queue details
    let queueDetails = ("UseDevelopmentStorage=true", "sample-queue")

    // Reader
    let subscriber = new AzureStorageQueueAgent<Person>(fun mb ->
    let rec loop() =
    async {
    let! person, channel = mb.Receive()
    let status = match person with
    | { Name = "Isaac"; Age = _ }
    | { Name = "Richard"; Age = _ } -> Completed
    | { Name = "Andy"; Age = _ } -> Update { person with Name = person.Name + "xyz" }
    | _ -> Failed
    channel.Reply status
    return! loop()
    }
    loop())

    /// Magic to bind the reader to the storage queue
    subscriber.Start()
    subscriber |> bindToQueue(queueDetails, { PollTime = TimeSpan.FromSeconds 10.; LeaseLength = Some <| TimeSpan.FromSeconds 5.; MaxDequeueCount = Some 3 })

    /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
    let writer = queueDetails |> createQueueWriter<Person>
    writer.Start()
    writer.Post { Name = "Isaac"; Age = 34 }
    writer.Post { Name = "Andy"; Age = 32 }
    writer.Post { Name = "Richard"; Age = 39 }
    writer.Post { Name = "Joe Bloggs"; Age = 35 }
  7. @isaacabraham isaacabraham revised this gist Sep 8, 2014. 1 changed file with 58 additions and 45 deletions.
    103 changes: 58 additions & 45 deletions StorageQueueMailboxProcessor.fs
    Original file line number Diff line number Diff line change
    @@ -1,13 +1,14 @@
    open Microsoft.WindowsAzure.Storage
    open Microsoft.WindowsAzure.Storage.Queue
    open Newtonsoft.Json
    open System

    /// Code to bind mailbox processors to azure storage queues.
    module AzureMailboxProcessor =
    module AzureMailboxProcessor
    open System

    module Async =
    let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore

    open Microsoft.WindowsAzure.Storage
    open Microsoft.WindowsAzure.Storage.Queue
    open Newtonsoft.Json

    /// Gets a handle to an Azure storage queue
    let private getQueue (connectionString, queueName) =
    let connection = CloudStorageAccount.Parse connectionString
    @@ -23,49 +24,61 @@ module AzureMailboxProcessor =
    let message = CloudQueueMessage(message |> JsonConvert.SerializeObject)
    do! queue.AddMessageAsync(message) |> Async.AwaitTaskEmpty })

    [<Measure>]
    type Seconds
    // The different completion statuses a message can have.
    type MessageProcessedStatus<'a> =
    /// The message successfully completed.
    | Completed
    /// The message was not processed successfully and should be returned to the queue for processing again.
    | Failed
    /// Replace this method with a new payload.
    | Update of UpdatedPayload : 'a

    let toOption (nullable:Nullable<_>) = if nullable.HasValue then Some nullable.Value else None
    let toNullable = function
    | Some value -> Nullable value
    | None -> Nullable()

    /// Contains details on the queue subscription.
    type QueueSubscriptionOptions =
    { /// How long to wait between polling requests.
    PollTime : TimeSpan
    /// The lease time for a queue message.
    LeaseLength : TimeSpan option
    /// How many times a message can be dequeued before being permanently removed.
    MaxDequeueCount : int option }

    let completeMessage message (queue:CloudQueue) =
    queue.DeleteMessageAsync message |> Async.AwaitTaskEmpty

    /// Binds a MailboxProcessor to an Azure storage queue.
    let bindToQueue<'a> (connectionString, queueName) pollTime (agent:MailboxProcessor<'a>) =
    let bindToQueue<'a>((connectionString, queueName), options) (agent:MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>>) =
    let queue = getQueue(connectionString, queueName)
    async {
    let queue = getQueue(connectionString, queueName)
    let getMessage() = queue.GetMessageAsync() |> Async.AwaitTask
    while true do
    let! message = getMessage()
    let! message = queue.GetMessageAsync(options.LeaseLength |> toNullable, null, null) |> Async.AwaitTask
    match message with
    | null ->
    printfn "no message, sleeping for %A seconds..." pollTime
    do! Async.Sleep((pollTime / 1<Seconds>) * 1000)
    | null->
    do! Async.Sleep(options.PollTime.TotalMilliseconds |> int)
    | message ->
    printfn "got a message, deserializing and posting..."
    JsonConvert.DeserializeObject<'a>(message.AsString) |> agent.Post
    do! queue.DeleteMessageAsync(message) |> Async.AwaitTaskEmpty
    match message.DequeueCount, options.MaxDequeueCount with
    | count, Some limit when count >= limit ->
    // Message dequeue count exceeded, just delete the message
    do! queue |> completeMessage message
    | _ ->
    try
    let timeout = match options.LeaseLength with
    | Some lifetime -> lifetime.TotalMilliseconds |> int
    | None -> Threading.Timeout.Infinite
    let status = agent.TryPostAndReply((fun ch -> JsonConvert.DeserializeObject<'a>(message.AsString), ch), timeout)
    match status with
    | None // there was no reply from the agent within specified time; lease has expired.
    | Some Failed -> () // message has failed to process; do not complete the message.
    | Some Completed -> do! queue |> completeMessage message
    | Some (Update payload) ->
    do! queue |> completeMessage message
    do! queue.AddMessageAsync(CloudQueueMessage(payload |> JsonConvert.SerializeObject)) |> Async.AwaitTaskEmpty
    with ex -> printfn "ARGH %s" ex.Message
    } |> Async.Start

    /// Here's a DTO of ours - shared between reader and writer
    type Person =
    { Name : string
    Age : int }

    /// Reader - listens for person messages and print out the details...
    let subscriber = new MailboxProcessor<Person>(fun mb -> async {
    while true do
    let! message = mb.Receive()
    printfn "Person %s is %d years old." message.Name message.Age
    })
    subscriber.Start()

    open AzureMailboxProcessor

    /// Magic to bind the agent to a storage queue
    subscriber
    |> bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<Seconds>


    /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
    let personWriter = createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue")
    personWriter.Start()

    /// Start posting messages!
    personWriter.Post { Name = "Isaac"; Age = 34 }

    /// Represents an F# Agent that can be bound to an Azure storage queue.
    type AzureStorageQueueAgent<'a> = MailboxProcessor<'a * AsyncReplyChannel<MessageProcessedStatus<'a>>>
  8. @isaacabraham isaacabraham revised this gist Jul 20, 2014. 1 changed file with 10 additions and 5 deletions.
    15 changes: 10 additions & 5 deletions StorageQueueMailboxProcessor.fs
    Original file line number Diff line number Diff line change
    @@ -5,6 +5,9 @@ open System

    /// Code to bind mailbox processors to azure storage queues.
    module AzureMailboxProcessor =
    module Async =
    let AwaitTaskEmpty = Async.AwaitIAsyncResult >> Async.Ignore

    /// Gets a handle to an Azure storage queue
    let private getQueue (connectionString, queueName) =
    let connection = CloudStorageAccount.Parse connectionString
    @@ -18,7 +21,7 @@ module AzureMailboxProcessor =
    while true do
    let! message = mb.Receive()
    let message = CloudQueueMessage(message |> JsonConvert.SerializeObject)
    do! (queue.AddMessageAsync(message) |> Async.AwaitIAsyncResult |> Async.Ignore) })
    do! queue.AddMessageAsync(message) |> Async.AwaitTaskEmpty })

    [<Measure>]
    type Seconds
    @@ -31,11 +34,13 @@ module AzureMailboxProcessor =
    while true do
    let! message = getMessage()
    match message with
    | null -> printfn "no message!"
    do! Async.Sleep((pollTime / 1<Seconds>) * 1000)
    | null ->
    printfn "no message, sleeping for %A seconds..." pollTime
    do! Async.Sleep((pollTime / 1<Seconds>) * 1000)
    | message ->
    JsonConvert.DeserializeObject<'a>(message.AsString)
    |> agent.Post
    printfn "got a message, deserializing and posting..."
    JsonConvert.DeserializeObject<'a>(message.AsString) |> agent.Post
    do! queue.DeleteMessageAsync(message) |> Async.AwaitTaskEmpty
    } |> Async.Start

    /// Here's a DTO of ours - shared between reader and writer
  9. @isaacabraham isaacabraham revised this gist Jul 20, 2014. 1 changed file with 5 additions and 3 deletions.
    8 changes: 5 additions & 3 deletions StorageQueueMailboxProcessor.fs
    Original file line number Diff line number Diff line change
    @@ -51,14 +51,16 @@ let subscriber = new MailboxProcessor<Person>(fun mb -> async {
    })
    subscriber.Start()

    open AzureMailboxProcessor

    /// Magic to bind the agent to a storage queue
    subscriber
    |> AzureMailboxProcessor.bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<AzureMailboxProcessor.Seconds>
    |> bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<Seconds>


    /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
    let personWriter = AzureMailboxProcessor.createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue")
    let personWriter = createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue")
    personWriter.Start()

    /// Start posting messages!
    personWriter.Post { Name = "Isaac"; Age = 34 }
    personWriter.Post { Name = "Isaac"; Age = 34 }
  10. @isaacabraham isaacabraham created this gist Jul 20, 2014.
    64 changes: 64 additions & 0 deletions StorageQueueMailboxProcessor.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,64 @@
    open Microsoft.WindowsAzure.Storage
    open Microsoft.WindowsAzure.Storage.Queue
    open Newtonsoft.Json
    open System

    /// Code to bind mailbox processors to azure storage queues.
    module AzureMailboxProcessor =
    /// Gets a handle to an Azure storage queue
    let private getQueue (connectionString, queueName) =
    let connection = CloudStorageAccount.Parse connectionString
    let queueClient = connection.CreateCloudQueueClient()
    queueClient.GetQueueReference queueName

    /// Creates a mailbox processor that writes to an Azure storage queue
    let createQueueWriter<'a> (connectionString, queueName) =
    let queue = getQueue(connectionString, queueName)
    new MailboxProcessor<'a>(fun mb -> async {
    while true do
    let! message = mb.Receive()
    let message = CloudQueueMessage(message |> JsonConvert.SerializeObject)
    do! (queue.AddMessageAsync(message) |> Async.AwaitIAsyncResult |> Async.Ignore) })

    [<Measure>]
    type Seconds

    /// Binds a MailboxProcessor to an Azure storage queue.
    let bindToQueue<'a> (connectionString, queueName) pollTime (agent:MailboxProcessor<'a>) =
    async {
    let queue = getQueue(connectionString, queueName)
    let getMessage() = queue.GetMessageAsync() |> Async.AwaitTask
    while true do
    let! message = getMessage()
    match message with
    | null -> printfn "no message!"
    do! Async.Sleep((pollTime / 1<Seconds>) * 1000)
    | message ->
    JsonConvert.DeserializeObject<'a>(message.AsString)
    |> agent.Post
    } |> Async.Start

    /// Here's a DTO of ours - shared between reader and writer
    type Person =
    { Name : string
    Age : int }

    /// Reader - listens for person messages and print out the details...
    let subscriber = new MailboxProcessor<Person>(fun mb -> async {
    while true do
    let! message = mb.Receive()
    printfn "Person %s is %d years old." message.Name message.Age
    })
    subscriber.Start()

    /// Magic to bind the agent to a storage queue
    subscriber
    |> AzureMailboxProcessor.bindToQueue("UseDevelopmentStorage=true", "sample-queue") 10<AzureMailboxProcessor.Seconds>


    /// Writer - Puts messages onto the storage queue (this would e.g. take place on another machine)
    let personWriter = AzureMailboxProcessor.createQueueWriter<Person>("UseDevelopmentStorage=true", "sample-queue")
    personWriter.Start()

    /// Start posting messages!
    personWriter.Post { Name = "Isaac"; Age = 34 }