Created
May 16, 2024 15:01
-
-
Save xsoheilalizadeh/a888f57608fd2397f0544844000ba64e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using x.Infrastructure.MassTransit; | |
| using MassTransit; | |
| using MassTransit.Scheduling; | |
| namespace x.Infrastructure; | |
| public static class MasstransitSchedulerExtensions | |
| { | |
| public static IMessageScheduler CreateCustomMessageScheduler(this IBus bus, Guid id) | |
| { | |
| return new MessageScheduler(new CustomPublishScheduleMessageProvider(bus, id), bus.Topology); | |
| } | |
| public static Task<ScheduledMessage<T>> Schedule<T>(this IBus bus, DateTime scheduleTime, Guid id, T message, CancellationToken ct = default) where T: class | |
| { | |
| var eventId = EventIdGenerator.Generate(id, typeof(T).Name); | |
| var eventScheduler = bus.CreateCustomMessageScheduler(eventId); | |
| return eventScheduler.SchedulePublish(scheduleTime, message, ct); | |
| } | |
| public static Task CancelSchedule<T>(this IBus bus, Guid id) where T : class | |
| { | |
| var eventId = EventIdGenerator.Generate(id, typeof(T).Name); | |
| var eventScheduler = bus.CreateCustomMessageScheduler(eventId); | |
| return eventScheduler.CancelScheduledPublish<T>(eventId); | |
| } | |
| } | |
| class ScheduleMessageContextPipe<T> : | |
| IPipe<SendContext<ScheduleMessage>> | |
| where T : class | |
| { | |
| readonly T _payload; | |
| readonly IPipe<SendContext<T>> _pipe; | |
| SendContext _context; | |
| Guid? _scheduledMessageId; | |
| public ScheduleMessageContextPipe(T payload, IPipe<SendContext<T>> pipe) | |
| { | |
| _payload = payload; | |
| _pipe = pipe; | |
| } | |
| public Guid? ScheduledMessageId | |
| { | |
| get => _context?.ScheduledMessageId ?? _scheduledMessageId; | |
| set => _scheduledMessageId = value; | |
| } | |
| public async Task Send(SendContext<ScheduleMessage> context) | |
| { | |
| _context = context; | |
| _context.ScheduledMessageId = _scheduledMessageId; | |
| if (_pipe.IsNotEmpty()) | |
| { | |
| SendContext<T> proxy = context.CreateProxy(_payload); | |
| await _pipe.Send(proxy).ConfigureAwait(false); | |
| } | |
| } | |
| void IProbeSite.Probe(ProbeContext context) | |
| { | |
| _pipe?.Probe(context); | |
| } | |
| } | |
| /// In order to be able to set a custom id for scheduled job | |
| /// we needed to customize its provider | |
| /// issue: https://github.com/MassTransit/MassTransit/issues/2885 | |
| public class CustomPublishScheduleMessageProvider : IScheduleMessageProvider | |
| { | |
| readonly IPublishEndpoint _publishEndpoint; | |
| readonly Guid _tokenId; | |
| public CustomPublishScheduleMessageProvider(IPublishEndpoint publishEndpoint, Guid tokenId) | |
| { | |
| _publishEndpoint = publishEndpoint; | |
| _tokenId = tokenId; | |
| } | |
| public async Task<ScheduledMessage<T>> ScheduleSend<T>(Uri destinationAddress, DateTime scheduledTime, T message, | |
| IPipe<SendContext<T>> pipe, | |
| CancellationToken cancellationToken) where T : class | |
| { | |
| var scheduleMessagePipe = new ScheduleMessageContextPipe<T>(message, pipe) | |
| { | |
| ScheduledMessageId = _tokenId | |
| }; | |
| var command = new ScheduleMessageCommand<T>(scheduledTime, destinationAddress, message, _tokenId); | |
| await ScheduleSend(command, scheduleMessagePipe, cancellationToken).ConfigureAwait(false); | |
| return new ScheduledMessageHandle<T>(scheduleMessagePipe.ScheduledMessageId ?? command.CorrelationId, | |
| command.ScheduledTime, | |
| command.Destination, | |
| (T)command.Payload); | |
| } | |
| Task IScheduleMessageProvider.CancelScheduledSend(Guid tokenId) | |
| { | |
| return CancelScheduledSend(tokenId, null); | |
| } | |
| Task IScheduleMessageProvider.CancelScheduledSend(Uri destinationAddress, Guid tokenId) | |
| { | |
| return CancelScheduledSend(tokenId, destinationAddress); | |
| } | |
| private Task ScheduleSend(ScheduleMessage message, IPipe<SendContext<ScheduleMessage>> pipe, | |
| CancellationToken ct) | |
| { | |
| return _publishEndpoint.Publish(message, pipe, ct); | |
| } | |
| private Task CancelScheduledSend(Guid tokenId, Uri destinationAddress) | |
| { | |
| return _publishEndpoint.Publish<CancelScheduledMessage>(new | |
| { | |
| InVar.CorrelationId, | |
| InVar.Timestamp, | |
| TokenId = tokenId | |
| }); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment