Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save xsoheilalizadeh/a888f57608fd2397f0544844000ba64e to your computer and use it in GitHub Desktop.

Select an option

Save xsoheilalizadeh/a888f57608fd2397f0544844000ba64e to your computer and use it in GitHub Desktop.
using x.Infrastructure.MassTransit;
using MassTransit;
using MassTransit.Scheduling;
namespace x.Infrastructure;
public static class MasstransitSchedulerExtensions
{
public static IMessageScheduler CreateCustomMessageScheduler(this IBus bus, Guid id)
{
return new MessageScheduler(new CustomPublishScheduleMessageProvider(bus, id), bus.Topology);
}
public static Task<ScheduledMessage<T>> Schedule<T>(this IBus bus, DateTime scheduleTime, Guid id, T message, CancellationToken ct = default) where T: class
{
var eventId = EventIdGenerator.Generate(id, typeof(T).Name);
var eventScheduler = bus.CreateCustomMessageScheduler(eventId);
return eventScheduler.SchedulePublish(scheduleTime, message, ct);
}
public static Task CancelSchedule<T>(this IBus bus, Guid id) where T : class
{
var eventId = EventIdGenerator.Generate(id, typeof(T).Name);
var eventScheduler = bus.CreateCustomMessageScheduler(eventId);
return eventScheduler.CancelScheduledPublish<T>(eventId);
}
}
class ScheduleMessageContextPipe<T> :
IPipe<SendContext<ScheduleMessage>>
where T : class
{
readonly T _payload;
readonly IPipe<SendContext<T>> _pipe;
SendContext _context;
Guid? _scheduledMessageId;
public ScheduleMessageContextPipe(T payload, IPipe<SendContext<T>> pipe)
{
_payload = payload;
_pipe = pipe;
}
public Guid? ScheduledMessageId
{
get => _context?.ScheduledMessageId ?? _scheduledMessageId;
set => _scheduledMessageId = value;
}
public async Task Send(SendContext<ScheduleMessage> context)
{
_context = context;
_context.ScheduledMessageId = _scheduledMessageId;
if (_pipe.IsNotEmpty())
{
SendContext<T> proxy = context.CreateProxy(_payload);
await _pipe.Send(proxy).ConfigureAwait(false);
}
}
void IProbeSite.Probe(ProbeContext context)
{
_pipe?.Probe(context);
}
}
/// In order to be able to set a custom id for scheduled job
/// we needed to customize its provider
/// issue: https://github.com/MassTransit/MassTransit/issues/2885
public class CustomPublishScheduleMessageProvider : IScheduleMessageProvider
{
readonly IPublishEndpoint _publishEndpoint;
readonly Guid _tokenId;
public CustomPublishScheduleMessageProvider(IPublishEndpoint publishEndpoint, Guid tokenId)
{
_publishEndpoint = publishEndpoint;
_tokenId = tokenId;
}
public async Task<ScheduledMessage<T>> ScheduleSend<T>(Uri destinationAddress, DateTime scheduledTime, T message,
IPipe<SendContext<T>> pipe,
CancellationToken cancellationToken) where T : class
{
var scheduleMessagePipe = new ScheduleMessageContextPipe<T>(message, pipe)
{
ScheduledMessageId = _tokenId
};
var command = new ScheduleMessageCommand<T>(scheduledTime, destinationAddress, message, _tokenId);
await ScheduleSend(command, scheduleMessagePipe, cancellationToken).ConfigureAwait(false);
return new ScheduledMessageHandle<T>(scheduleMessagePipe.ScheduledMessageId ?? command.CorrelationId,
command.ScheduledTime,
command.Destination,
(T)command.Payload);
}
Task IScheduleMessageProvider.CancelScheduledSend(Guid tokenId)
{
return CancelScheduledSend(tokenId, null);
}
Task IScheduleMessageProvider.CancelScheduledSend(Uri destinationAddress, Guid tokenId)
{
return CancelScheduledSend(tokenId, destinationAddress);
}
private Task ScheduleSend(ScheduleMessage message, IPipe<SendContext<ScheduleMessage>> pipe,
CancellationToken ct)
{
return _publishEndpoint.Publish(message, pipe, ct);
}
private Task CancelScheduledSend(Guid tokenId, Uri destinationAddress)
{
return _publishEndpoint.Publish<CancelScheduledMessage>(new
{
InVar.CorrelationId,
InVar.Timestamp,
TokenId = tokenId
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment