Created
October 22, 2015 21:03
-
-
Save freezy/ed0dfd34e970fc235d4f to your computer and use it in GitHub Desktop.
Revisions
-
freezy created this gist
Oct 22, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,210 @@ /// <summary> /// A class which does not automatically start queued jobs but /// requires a user of the class to explicitely start new jobs by calling /// StartNext() or StartUpTo(maxConcurrentCount). /// </summary> /// <a href="https://social.msdn.microsoft.com/Forums/en-US/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7/nonblocking-jobqueue-and-paralleljobqueue-sample?forum=rx">Source</a> public class JobQueue { public struct Job { public Func<IObservable<Unit>> AsyncStart; public AsyncSubject<Unit> CompletionHandler; public BooleanDisposable Cancel; public MultipleAssignmentDisposable JobSubscription; } int _runningCount; readonly ConcurrentQueue<Job> _queue = new ConcurrentQueue<Job>(); readonly Subject<Unit> _whenQueueEmpty = new Subject<Unit>(); readonly Subject<Notification<Unit>> _whenJobCompletes = new Subject<Notification<Unit>>(); readonly Subject<Exception> _whenJobFails = new Subject<Exception>(); public IObservable<Notification<Unit>> WhenJobCompletes => _whenJobCompletes.AsObservable(); public IObservable<Unit> WhenQueueEmpty => _whenQueueEmpty.AsObservable(); public IObservable<Exception> WhenJobFails => _whenJobFails; public int RunningCount => _runningCount; public int QueuedCount => _queue.Count; public JobQueue() { // whenJobFails subscription _whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError) .Select(n => n.Exception) .Subscribe(_whenJobFails); // whenQueueEmpty subscription _whenJobCompletes.Synchronize(this) .Where(n => _queue.Count == 0 && _runningCount == 0) .Select(n => new Unit()).Subscribe(_whenQueueEmpty); } public IObservable<Unit> Add(Action action) { return Add(action.ToAsync()); } public IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart) { var job = new Job() { AsyncStart = asyncStart, CompletionHandler = new AsyncSubject<Unit>(), Cancel = new BooleanDisposable(), JobSubscription = new MultipleAssignmentDisposable() }; var cancelable = Observable.Create<Unit>(o => new CompositeDisposable( job.CompletionHandler.Subscribe(o), // main job subscription job.JobSubscription, job.Cancel) ); job.CompletionHandler .Materialize() .Where(n => n.Kind == NotificationKind.OnCompleted || n.Kind == NotificationKind.OnError) .Subscribe(_whenJobCompletes.OnNext); // pass on errors and completions _queue.Enqueue(job); return cancelable; } public int StartUpTo(int maxConcurrentlyRunning) { var started = 0; for (;;) { for (;;) { int running; do // test and increment with compare and swap { running = _runningCount; if (running >= maxConcurrentlyRunning) return started; } while (Interlocked.CompareExchange(ref _runningCount, running + 1, running) != running); Job job; if (TryDequeNextJob(out job)) { StartJob(job); ++started; } else { // dequeing job failed but we already incremented running count Interlocked.Decrement(ref _runningCount); // ensure that no other thread queued an item and did not start it // because the running count was too high if (_queue.Count == 0) { // if there is nothing in the queue after the decrement // we can safely return return started; } } } } } public bool StartNext() { Job job; if (TryDequeNextJob(out job)) { Interlocked.Increment(ref _runningCount); StartJob(job); return true; } return false; } bool TryDequeNextJob(out Job job) { do { if (!_queue.TryDequeue(out job)) return false; } while (job.Cancel.IsDisposed); return true; } void StartJob(Job job) { try { var jobSubscription = job.AsyncStart().Subscribe( u => OnJobCompleted(job, null), e => OnJobCompleted(job, e) ); job.JobSubscription.Disposable = jobSubscription; if (job.Cancel.IsDisposed) job.JobSubscription.Dispose(); } catch (Exception ex) { OnJobCompleted(job, ex); throw; } } public void CancelOutstandingJobs() { Job job; while (TryDequeNextJob(out job)) job.CompletionHandler.OnError(new OperationCanceledException()); } void OnJobCompleted(Job job, Exception error) { Interlocked.Decrement(ref _runningCount); if (error == null) job.CompletionHandler.OnNext(new Unit()); else job.CompletionHandler.OnError(error); } } /// <summary> /// A class which uses the JobQueue internally and starts new jobs as soon /// as the number of currently running jobs drops below a given threshold. /// </summary> public class ParallelJobQueue { readonly int _maxConcurrent; public ParallelJobQueue(int maxConcurrent) { if (maxConcurrent < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrent)); _maxConcurrent = maxConcurrent; InnerQueue = new JobQueue(); InnerQueue.WhenJobCompletes.Subscribe(OnJobCompleted); } public JobQueue InnerQueue { get; private set; } public IObservable<Unit> Add(Action action) { return Add(action.ToAsync()); } public IObservable<Unit> Add(Func<IObservable<Unit>> asyncStart) { var whenCompletes = InnerQueue.Add(asyncStart); InnerQueue.StartUpTo(_maxConcurrent); return whenCompletes; } /// <summary> /// Stops starting new jobs of the old queue by replacing /// the inner queue with an empty new one. /// </summary> public void Stop() { var oldQueue = InnerQueue; InnerQueue = new JobQueue(); oldQueue.CancelOutstandingJobs(); } void OnJobCompleted(Notification<Unit> notification) { InnerQueue.StartUpTo(_maxConcurrent); } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,71 @@ I need a decent job queue for a c# project, preferably using Reactive Extensions. I came across [this excellent post](https://social.msdn.microsoft.com/Forums/en-US/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7/nonblocking-jobqueue-and-paralleljobqueue-sample?referrer=http://social.msdn.microsoft.com/Forums/en-US/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7/nonblocking-jobqueue-and-paralleljobqueue-sample?referrer=http://social.msdn.microsoft.com/Forums/en-US/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7/nonblocking-jobqueue-and-paralleljobqueue-sample?forum=rx) by Andreas Köpf, but this was quite some time ago and obviously for an older version of Rx. Below you can see my updated code. The (major) changes made are the following: - `MutableDisposable` becomes `MultipleAssignmentDisposable` - Then this block: ```csharp whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError) .Select(n => ((Notification<Unit>.OnError)n).Exception) .Subscribe(whenJobFails); ``` becomes ``` whenJobCompletes.Where(n => n.Kind == NotificationKind.OnError) .Select(n => n.Exception) .Subscribe(_whenJobFails); ``` - `whenJobCompletes.Hide()` becomes `whenJobCompletes.AsObservable()`: ``` public IObservable<Notification<Unit>> WhenJobCompletes => _whenJobCompletes.AsObservable(); ``` - `GroupDisposable` becomes `CompositeDisposable`, so the `cancelable` in `Add()` becomes: ``` var cancelable = Observable.Create<Unit>(o => new CompositeDisposable( job.CompletionHandler.Subscribe(o), // main job subscription job.JobSubscription, job.Cancel) ); ``` - And finally, the `Replace()` method doesn't exist anymore, so the subscription at `StartJob` is replaced by: ``` job.JobSubscription.Disposable = jobSubscription; ``` You can see the whole class below. *However*, for some reason it doesn't seem to work. Running this sample code: ``` var jobQueue = new ParallelJobQueue(5); // subscribe to failures jobQueue.InnerQueue.WhenJobFails.Subscribe(e => Console.WriteLine("Job failed: {0}", e.Message)); // subscribe to empty queue notification jobQueue.InnerQueue.WhenQueueEmpty.Subscribe(n => Console.WriteLine("Empty!")); int completed1 = 0, completed2 = 0, errors = 0; // test counters foreach (var i in Enumerable.Range(0, 100)) { var x = i; jobQueue.Add(() => { Console.WriteLine("Thread {0}: {1}", Thread.CurrentThread.ManagedThreadId, x); if ((Interlocked.Increment(ref completed1) % 10) == 0) { // generate test exceptions throw new Exception("Text exception " + completed1); } }).Subscribe(n => Interlocked.Increment(ref completed2), e => Interlocked.Increment(ref errors)); } jobQueue.InnerQueue.WhenQueueEmpty.Subscribe(_ => { Console.WriteLine("DONE! Received 1: {0}, 2: {1}, errors: {2}", completed1, completed2, errors); }); ``` Stops at around 65 and the *DONE* line is never printed. I'm still new to Rx, so maybe @andreaskoepf you have an idea? I'm sorry I registered at Microsoft specifically to reply to your thread, but somehow the handycapped software over there won't let me post.