Skip to content

Instantly share code, notes, and snippets.

@fvoronin
Last active July 13, 2017 08:34
Show Gist options
  • Select an option

  • Save fvoronin/7b23462ab5854fd98af85ed4f04c4da9 to your computer and use it in GitHub Desktop.

Select an option

Save fvoronin/7b23462ab5854fd98af85ed4f04c4da9 to your computer and use it in GitHub Desktop.
RabbitMQ topology not recovering
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Setting up Connection Recovery
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Initiating 0 channels.
[13:33:06] [$1] [INFO] [ChannelFactory]: Channel scaling is disabled.
[13:33:06] [$1] [INFO] [BaseBusClient`1]: BusClient initialized.
[13:33:06] [$1] [INFO] [BaseBusClient`1]: Subscribing to message 'String' on exchange 'test.for-sever' with routing key server.
[13:33:06] [$1] [DEBUG] [TopologyProvider]: Start processing topology work.
[13:33:06] [$1] [INFO] [TopologyProvider]: Declaring queue 'test.server'.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used.
[13:33:06] [$1] [INFO] [TopologyProvider]: Declaring exchange 'test.for-sever'.
[13:33:06] [$1] [INFO] [TopologyProvider]: Binding queue 'test.server' to exchange 'test.for-sever' with routing key 'server'
[13:33:06] [$1] [DEBUG] [TopologyProvider]: Done processing topology work.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used.
[13:33:06] [$4] [DEBUG] [EventingBasicConsumerFactory]: Setting QoS
Prefetch Size: 0
Prefetch Count: 50
global: false
[13:33:06] [$4] [DEBUG] [Subscriber`1]: Setting up a consumer on channel '2' for queue test.server with NoAck set to False.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Setting up Connection Recovery
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Initiating 0 channels.
[13:33:06] [$1] [INFO] [ChannelFactory]: Channel scaling is disabled.
[13:33:06] [$1] [INFO] [BaseBusClient`1]: BusClient initialized.
[13:33:06] [$1] [INFO] [BaseBusClient`1]: Subscribing to message 'String' on exchange 'test.for-client' with routing key client.
[13:33:06] [$1] [DEBUG] [TopologyProvider]: Start processing topology work.
[13:33:06] [$1] [INFO] [TopologyProvider]: Declaring queue 'test.cleint'.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used.
[13:33:06] [$1] [INFO] [TopologyProvider]: Declaring exchange 'test.for-client'.
[13:33:06] [$1] [INFO] [TopologyProvider]: Binding queue 'test.cleint' to exchange 'test.for-client' with routing key 'client'
[13:33:06] [$1] [DEBUG] [TopologyProvider]: Done processing topology work.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used.
[13:33:06] [$10] [DEBUG] [EventingBasicConsumerFactory]: Setting QoS
Prefetch Size: 0
Prefetch Count: 50
global: false
[13:33:06] [$10] [DEBUG] [Subscriber`1]: Setting up a consumer on channel '2' for queue test.cleint with NoAck set to False.
[13:33:06] [$1] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server.
[13:33:06] [$1] [DEBUG] [TopologyProvider]: Start processing topology work.
[13:33:06] [$1] [INFO] [TopologyProvider]: Declaring exchange 'test.for-sever'.
[13:33:06] [$1] [DEBUG] [TopologyProvider]: Done processing topology work.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used.
[13:33:06] [$5] [INFO] [ChannelFactory]: Channel '3' has been created.
[13:33:06] [$1] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed.
[13:33:06] [$10] [INFO] [PublishAcknowledger]: Setting 'Publish Acknowledge' for channel '3'
[13:33:06] [$12] [INFO] [PublishAcknowledger]: Recieved ack for 1/3 with multiple set to 'False'
[13:33:06] [$5] [INFO] [EventingBasicConsumerFactory]: Message recived: MessageId: 427793f5-347d-4c61-941b-b5e514cd4ef4
[13:33:06] [$12] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 1/3
[13:33:06] [$12] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 1/3
Client Message published 1
Server Received: MESSAGE 1
[13:33:06] [$5] [DEBUG] [EventingBasicConsumerFactory]: Ack:ing message with id 1.
[13:33:08] [$5] [INFO] [TopologyProvider]: Disposing topology channel (if exists).
[13:33:08] [$14] [INFO] [TopologyProvider]: Disposing topology channel (if exists).
[13:33:09] [$10] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server.
[13:33:09] [$10] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests.
[13:33:09] [$10] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed.
[13:33:09] [$14] [INFO] [EventingBasicConsumerFactory]: Message recived: MessageId: 5213a360-5b2e-4f59-b4ec-a5c8b0d63ed8
[13:33:09] [$12] [INFO] [PublishAcknowledger]: Recieved ack for 2/3 with multiple set to 'False'
Server Received: MESSAGE 2
[13:33:09] [$12] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 2/3
Client Message published 2
[13:33:09] [$12] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 2/3
[13:33:09] [$14] [DEBUG] [EventingBasicConsumerFactory]: Ack:ing message with id 2.
[13:33:10] [$8] [DEBUG] [EventingRawConsumer]: The consumer with tag 'amq.ctag-8KbFsgKaW8CWY2yiEvoW-Q' has been cancelled.
[13:33:10] [$12] [DEBUG] [EventingRawConsumer]: The consumer with tag 'amq.ctag-DmEHkMlVeCeOaYINbPyO3A' has been cancelled.
[13:33:10] [$8] [INFO] [EventingRawConsumer]: Consumer amq.ctag-8KbFsgKaW8CWY2yiEvoW-Q has been shut down.
Reason: System.IO.IOException: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. ---
> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)
at RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
Initiator: Library
Reply Text: Unexpected Exception
[13:33:10] [$12] [INFO] [EventingRawConsumer]: Consumer amq.ctag-DmEHkMlVeCeOaYINbPyO3A has been shut down.
Reason: System.IO.IOException: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. ---
> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)
at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
--- End of inner exception stack trace ---
at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)
at RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
Initiator: Library
Reply Text: Unexpected Exception
[13:33:12] [$5] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server.
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering exchange test.for-client: Already closed:
The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitM
Q.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200,
text="Goodbye", classId=0, methodId=0, cause=
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete,
Boolean internal, Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.RecordedExchange.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverExchanges()
--- End of inner exception stack trace ---
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering exchange test.for-sever: Already closed:
The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ
.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200,
text="Goodbye", classId=0, methodId=0, cause=
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete,
Boolean internal, Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.RecordedExchange.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverExchanges()
--- End of inner exception stack trace ---
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering queue test.cleint: Already closed: The AM
QP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ.Clien
t.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="
Goodbye", classId=0, methodId=0, cause=
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete,
Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2
arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.RecordedQueue.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverQueues()
--- End of inner exception stack trace ---
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering binding between test.for-client and test.
cleint: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId
=0, cause= ---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by
Application, code=200, text="Goodbye", classId=0, methodId=0, cause=
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.RecordedQueueBinding.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverBindings()
--- End of inner exception stack trace ---
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-DmEHkMlVeCeOaYINbPyO3A
on queue test.cleint: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.cleint' in
vhost '/'", classId=60, methodId=20, cause= ---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP cl
ose-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.cleint' in vhost '/'", classId=60, methodId=20, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean noAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 a
rguments, IBasicConsumer consumer)
at RabbitMQ.Client.Impl.RecordedConsumer.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers()
--- End of inner exception stack trace ---
[13:33:20] [$14] [INFO] [ChannelFactory]: Connection has been recovered. Starting channel processing.
[13:33:20] [$14] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests.
[13:33:20] [$14] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed.
[13:33:20] [$18] [INFO] [PublishAcknowledger]: Recieved ack for 1/2 with multiple set to 'False'
[13:33:20] [$18] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 1/2
Client Message published 3
[13:33:20] [$18] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 1/2
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering exchange test.for-sever: Already closed:
The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ
.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200,
text="Goodbye", classId=0, methodId=0, cause=
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete,
Boolean internal, Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.RecordedExchange.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverExchanges()
--- End of inner exception stack trace ---
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering queue test.server: Already closed: The AM
QP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ.Clien
t.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="
Goodbye", classId=0, methodId=0, cause=
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete,
Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2
arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.RecordedQueue.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverQueues()
--- End of inner exception stack trace ---
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering binding between test.for-sever and test.s
erver: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=
0, cause= ---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by
Application, code=200, text="Goodbye", classId=0, methodId=0, cause=
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Framing.Impl.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments)
at RabbitMQ.Client.Impl.RecordedQueueBinding.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverBindings()
--- End of inner exception stack trace ---
Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-8KbFsgKaW8CWY2yiEvoW-Q
on queue test.server: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.server' in
vhost '/'", classId=60, methodId=20, cause= ---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP cl
ose-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.server' in vhost '/'", classId=60, methodId=20, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean noAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 a
rguments, IBasicConsumer consumer)
at RabbitMQ.Client.Impl.RecordedConsumer.Recover()
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers()
--- End of inner exception stack trace ---
[13:33:21] [$4] [INFO] [ChannelFactory]: Connection has been recovered. Starting channel processing.
[13:33:21] [$4] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests.
[13:33:21] [$4] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed.
[13:33:23] [$4] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server.
[13:33:23] [$4] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests.
[13:33:23] [$4] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed.
[13:33:23] [$18] [INFO] [PublishAcknowledger]: Recieved ack for 2/2 with multiple set to 'False'
[13:33:23] [$18] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 2/2
Client Message published 4
[13:33:23] [$18] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 2/2
[13:33:26] [$14] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server.
[13:33:26] [$14] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests.
[13:33:26] [$14] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed.
[13:33:26] [$18] [INFO] [PublishAcknowledger]: Recieved ack for 3/2 with multiple set to 'False'
[13:33:26] [$18] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 3/2
Client Message published 5
[13:33:26] [$18] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 3/2
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Extensions.Configuration" version="1.1.2" targetFramework="net462" />
<package id="Microsoft.Extensions.Configuration.Abstractions" version="1.1.2" targetFramework="net462" />
<package id="Microsoft.Extensions.Configuration.Binder" version="1.1.2" targetFramework="net462" />
<package id="Microsoft.Extensions.DependencyInjection" version="1.1.1" targetFramework="net462" />
<package id="Microsoft.Extensions.DependencyInjection.Abstractions" version="1.1.1" targetFramework="net462" />
<package id="Microsoft.Extensions.Logging" version="1.1.2" targetFramework="net462" />
<package id="Microsoft.Extensions.Logging.Abstractions" version="1.1.2" targetFramework="net462" />
<package id="Microsoft.Extensions.Primitives" version="1.1.1" targetFramework="net462" />
<package id="Microsoft.NETCore.Platforms" version="1.1.0" targetFramework="net462" />
<package id="Microsoft.Win32.Primitives" version="4.3.0" targetFramework="net462" />
<package id="NETStandard.Library" version="1.6.1" targetFramework="net462" />
<package id="Newtonsoft.Json" version="10.0.3" targetFramework="net462" />
<package id="RabbitMQ.Client" version="4.1.3" targetFramework="net462" />
<package id="RawRabbit" version="1.10.3" targetFramework="net462" />
<package id="RawRabbit.Extensions" version="1.10.3" targetFramework="net462" />
<package id="RawRabbit.vNext" version="1.10.3" targetFramework="net462" />
<package id="System.AppContext" version="4.3.0" targetFramework="net462" />
<package id="System.Collections" version="4.3.0" targetFramework="net462" />
<package id="System.Collections.Concurrent" version="4.3.0" targetFramework="net462" />
<package id="System.Collections.NonGeneric" version="4.3.0" targetFramework="net462" />
<package id="System.ComponentModel" version="4.3.0" targetFramework="net462" />
<package id="System.ComponentModel.Primitives" version="4.3.0" targetFramework="net462" />
<package id="System.ComponentModel.TypeConverter" version="4.3.0" targetFramework="net462" />
<package id="System.Console" version="4.3.0" targetFramework="net462" />
<package id="System.Diagnostics.Debug" version="4.3.0" targetFramework="net462" />
<package id="System.Diagnostics.DiagnosticSource" version="4.4.0" targetFramework="net462" />
<package id="System.Diagnostics.Tools" version="4.3.0" targetFramework="net462" />
<package id="System.Diagnostics.Tracing" version="4.3.0" targetFramework="net462" />
<package id="System.Globalization" version="4.3.0" targetFramework="net462" />
<package id="System.Globalization.Calendars" version="4.3.0" targetFramework="net462" />
<package id="System.IO" version="4.3.0" targetFramework="net462" />
<package id="System.IO.Compression" version="4.3.0" targetFramework="net462" />
<package id="System.IO.Compression.ZipFile" version="4.3.0" targetFramework="net462" />
<package id="System.IO.FileSystem" version="4.3.0" targetFramework="net462" />
<package id="System.IO.FileSystem.Primitives" version="4.3.0" targetFramework="net462" />
<package id="System.Linq" version="4.3.0" targetFramework="net462" />
<package id="System.Linq.Expressions" version="4.3.0" targetFramework="net462" />
<package id="System.Net.Http" version="4.3.2" targetFramework="net462" />
<package id="System.Net.Primitives" version="4.3.0" targetFramework="net462" />
<package id="System.Net.Sockets" version="4.3.0" targetFramework="net462" />
<package id="System.ObjectModel" version="4.3.0" targetFramework="net462" />
<package id="System.Reflection" version="4.3.0" targetFramework="net462" />
<package id="System.Reflection.Extensions" version="4.3.0" targetFramework="net462" />
<package id="System.Reflection.Primitives" version="4.3.0" targetFramework="net462" />
<package id="System.Resources.ResourceManager" version="4.3.0" targetFramework="net462" />
<package id="System.Runtime" version="4.3.0" targetFramework="net462" />
<package id="System.Runtime.CompilerServices.Unsafe" version="4.3.0" targetFramework="net462" />
<package id="System.Runtime.Extensions" version="4.3.0" targetFramework="net462" />
<package id="System.Runtime.Handles" version="4.3.0" targetFramework="net462" />
<package id="System.Runtime.InteropServices" version="4.3.0" targetFramework="net462" />
<package id="System.Runtime.InteropServices.RuntimeInformation" version="4.3.0" targetFramework="net462" />
<package id="System.Runtime.Numerics" version="4.3.0" targetFramework="net462" />
<package id="System.Security.Cryptography.Algorithms" version="4.3.0" targetFramework="net462" />
<package id="System.Security.Cryptography.Encoding" version="4.3.0" targetFramework="net462" />
<package id="System.Security.Cryptography.Primitives" version="4.3.0" targetFramework="net462" />
<package id="System.Security.Cryptography.X509Certificates" version="4.3.0" targetFramework="net462" />
<package id="System.Text.Encoding" version="4.3.0" targetFramework="net462" />
<package id="System.Text.Encoding.Extensions" version="4.3.0" targetFramework="net462" />
<package id="System.Text.RegularExpressions" version="4.3.0" targetFramework="net462" />
<package id="System.Threading" version="4.3.0" targetFramework="net462" />
<package id="System.Threading.Tasks" version="4.3.0" targetFramework="net462" />
<package id="System.Threading.Timer" version="4.3.0" targetFramework="net462" />
<package id="System.Xml.ReaderWriter" version="4.3.0" targetFramework="net462" />
<package id="System.Xml.XDocument" version="4.3.0" targetFramework="net462" />
</packages>
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using RawRabbit.Configuration;
using RawRabbit.Configuration.Exchange;
using RawRabbit.vNext;
using RawRabbit.vNext.Disposable;
namespace TestRabbitMqRecovery
{
class Program
{
static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
Console.ReadLine();
}
static async Task MainAsync(string[] args)
{
var tcpProxy = new TcpForwarderSlim();
var t = new Thread(() =>
tcpProxy.Start(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 19672),
new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5672)));
t.Start();
var config = new RawRabbitConfiguration
{
Username = "guest",
Password = "guest",
Port = 19672,
VirtualHost = "/",
Hostnames = { "127.0.0.1" },
Queue = { AutoDelete = true, Durable = true, Exclusive = false },
RouteWithGlobalId = false,
AutomaticRecovery = true,
TopologyRecovery = true,
};
var server = new Server(config);
var client = new Client(config);
// start infinite message publishing
client.PublishAsync();
// wait for publish some messages
await Task.Delay(TimeSpan.FromSeconds(4));
// simulate network problem
tcpProxy.Close();
}
}
public class Client
{
private IBusClient busClient;
private long i;
public Client(RawRabbitConfiguration config)
{
busClient = BusClientFactory.CreateDefault(config);
busClient.SubscribeAsync<string>(async (msg, context) => Console.WriteLine("Client Received: {0}", msg),
cfg => cfg.WithExchange(exch => exch.WithName("test.for-client").WithType(ExchangeType.Direct))
.WithQueue(q => q.WithName("test.cleint"))
.WithSubscriberId("")
.WithRoutingKey("client"));
}
public async Task PublishAsync()
{
while (true)
{
i++;
await busClient.PublishAsync($"MESSAGE {i}", configuration: cfg => cfg
.WithExchange(ec => ec.WithName("test.for-sever").WithType(ExchangeType.Direct))
.WithRoutingKey("server"));
Console.WriteLine("Client Message published {0}", i);
await Task.Delay(3000);
}
}
}
public class Server
{
private IBusClient busClient;
public Server(RawRabbitConfiguration config)
{
busClient = BusClientFactory.CreateDefault(config);
busClient.SubscribeAsync<string>(async (msg, context) => Console.WriteLine("Server Received: {0}", msg),
cfg => cfg.WithExchange(exch => exch.WithName("test.for-sever").WithType(ExchangeType.Direct))
.WithQueue(q => q.WithName("test.server"))
.WithSubscriberId("")
.WithRoutingKey("server"));
}
}
/// <summary>
/// Forwarding TCP connection, to simulate network problems
/// </summary>
public class TcpForwarderSlim
{
private readonly Socket _mainSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
private Socket _source;
private readonly List<TcpForwarderSlim> _forwarders = new List<TcpForwarderSlim>();
public void Start(IPEndPoint local, IPEndPoint remote)
{
_mainSocket.Bind(local);
_mainSocket.Listen(10);
while (true)
{
var source = _mainSocket.Accept();
var destination = new TcpForwarderSlim();
var state = new State(source, destination._mainSocket);
destination.Connect(remote, source);
source.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, OnDataReceive, state);
_forwarders.Add(destination);
}
}
public void Close()
{
if (_forwarders.Count > 0)
{
foreach (var forwarder in _forwarders)
{
forwarder.Close();
}
}
else
{
_source.Close();
_mainSocket.Close();
}
}
private void Connect(EndPoint remoteEndpoint, Socket source)
{
var state = new State(_mainSocket, source);
_mainSocket.Connect(remoteEndpoint);
_mainSocket.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, OnDataReceive, state);
_source = source;
}
private void OnDataReceive(IAsyncResult result)
{
var state = (State)result.AsyncState;
try
{
var bytesRead = state.SourceSocket.EndReceive(result);
if (bytesRead > 0)
{
state.DestinationSocket.Send(state.Buffer, bytesRead, SocketFlags.None);
state.SourceSocket.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, OnDataReceive, state);
}
}
catch
{
state.DestinationSocket.Close();
state.SourceSocket.Close();
}
}
private class State
{
public Socket SourceSocket { get; private set; }
public Socket DestinationSocket { get; private set; }
public byte[] Buffer { get; private set; }
public State(Socket source, Socket destination)
{
SourceSocket = source;
DestinationSocket = destination;
Buffer = new byte[8192];
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment