Last active
July 13, 2017 08:34
-
-
Save fvoronin/7b23462ab5854fd98af85ed4f04c4da9 to your computer and use it in GitHub Desktop.
RabbitMQ topology not recovering
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Setting up Connection Recovery | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Initiating 0 channels. | |
| [13:33:06] [$1] [INFO] [ChannelFactory]: Channel scaling is disabled. | |
| [13:33:06] [$1] [INFO] [BaseBusClient`1]: BusClient initialized. | |
| [13:33:06] [$1] [INFO] [BaseBusClient`1]: Subscribing to message 'String' on exchange 'test.for-sever' with routing key server. | |
| [13:33:06] [$1] [DEBUG] [TopologyProvider]: Start processing topology work. | |
| [13:33:06] [$1] [INFO] [TopologyProvider]: Declaring queue 'test.server'. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used. | |
| [13:33:06] [$1] [INFO] [TopologyProvider]: Declaring exchange 'test.for-sever'. | |
| [13:33:06] [$1] [INFO] [TopologyProvider]: Binding queue 'test.server' to exchange 'test.for-sever' with routing key 'server' | |
| [13:33:06] [$1] [DEBUG] [TopologyProvider]: Done processing topology work. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used. | |
| [13:33:06] [$4] [DEBUG] [EventingBasicConsumerFactory]: Setting QoS | |
| Prefetch Size: 0 | |
| Prefetch Count: 50 | |
| global: false | |
| [13:33:06] [$4] [DEBUG] [Subscriber`1]: Setting up a consumer on channel '2' for queue test.server with NoAck set to False. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Setting up Connection Recovery | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Initiating 0 channels. | |
| [13:33:06] [$1] [INFO] [ChannelFactory]: Channel scaling is disabled. | |
| [13:33:06] [$1] [INFO] [BaseBusClient`1]: BusClient initialized. | |
| [13:33:06] [$1] [INFO] [BaseBusClient`1]: Subscribing to message 'String' on exchange 'test.for-client' with routing key client. | |
| [13:33:06] [$1] [DEBUG] [TopologyProvider]: Start processing topology work. | |
| [13:33:06] [$1] [INFO] [TopologyProvider]: Declaring queue 'test.cleint'. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used. | |
| [13:33:06] [$1] [INFO] [TopologyProvider]: Declaring exchange 'test.for-client'. | |
| [13:33:06] [$1] [INFO] [TopologyProvider]: Binding queue 'test.cleint' to exchange 'test.for-client' with routing key 'client' | |
| [13:33:06] [$1] [DEBUG] [TopologyProvider]: Done processing topology work. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used. | |
| [13:33:06] [$10] [DEBUG] [EventingBasicConsumerFactory]: Setting QoS | |
| Prefetch Size: 0 | |
| Prefetch Count: 50 | |
| global: false | |
| [13:33:06] [$10] [DEBUG] [Subscriber`1]: Setting up a consumer on channel '2' for queue test.cleint with NoAck set to False. | |
| [13:33:06] [$1] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server. | |
| [13:33:06] [$1] [DEBUG] [TopologyProvider]: Start processing topology work. | |
| [13:33:06] [$1] [INFO] [TopologyProvider]: Declaring exchange 'test.for-sever'. | |
| [13:33:06] [$1] [DEBUG] [TopologyProvider]: Done processing topology work. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: Existing connection is open and will be used. | |
| [13:33:06] [$5] [INFO] [ChannelFactory]: Channel '3' has been created. | |
| [13:33:06] [$1] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed. | |
| [13:33:06] [$10] [INFO] [PublishAcknowledger]: Setting 'Publish Acknowledge' for channel '3' | |
| [13:33:06] [$12] [INFO] [PublishAcknowledger]: Recieved ack for 1/3 with multiple set to 'False' | |
| [13:33:06] [$5] [INFO] [EventingBasicConsumerFactory]: Message recived: MessageId: 427793f5-347d-4c61-941b-b5e514cd4ef4 | |
| [13:33:06] [$12] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 1/3 | |
| [13:33:06] [$12] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 1/3 | |
| Client Message published 1 | |
| Server Received: MESSAGE 1 | |
| [13:33:06] [$5] [DEBUG] [EventingBasicConsumerFactory]: Ack:ing message with id 1. | |
| [13:33:08] [$5] [INFO] [TopologyProvider]: Disposing topology channel (if exists). | |
| [13:33:08] [$14] [INFO] [TopologyProvider]: Disposing topology channel (if exists). | |
| [13:33:09] [$10] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server. | |
| [13:33:09] [$10] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests. | |
| [13:33:09] [$10] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed. | |
| [13:33:09] [$14] [INFO] [EventingBasicConsumerFactory]: Message recived: MessageId: 5213a360-5b2e-4f59-b4ec-a5c8b0d63ed8 | |
| [13:33:09] [$12] [INFO] [PublishAcknowledger]: Recieved ack for 2/3 with multiple set to 'False' | |
| Server Received: MESSAGE 2 | |
| [13:33:09] [$12] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 2/3 | |
| Client Message published 2 | |
| [13:33:09] [$12] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 2/3 | |
| [13:33:09] [$14] [DEBUG] [EventingBasicConsumerFactory]: Ack:ing message with id 2. | |
| [13:33:10] [$8] [DEBUG] [EventingRawConsumer]: The consumer with tag 'amq.ctag-8KbFsgKaW8CWY2yiEvoW-Q' has been cancelled. | |
| [13:33:10] [$12] [DEBUG] [EventingRawConsumer]: The consumer with tag 'amq.ctag-DmEHkMlVeCeOaYINbPyO3A' has been cancelled. | |
| [13:33:10] [$8] [INFO] [EventingRawConsumer]: Consumer amq.ctag-8KbFsgKaW8CWY2yiEvoW-Q has been shut down. | |
| Reason: System.IO.IOException: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. --- | |
| > System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host | |
| at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags) | |
| at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size) | |
| --- End of inner exception stack trace --- | |
| at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader) | |
| at RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame() | |
| at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration() | |
| at RabbitMQ.Client.Framing.Impl.Connection.MainLoop() | |
| Initiator: Library | |
| Reply Text: Unexpected Exception | |
| [13:33:10] [$12] [INFO] [EventingRawConsumer]: Consumer amq.ctag-DmEHkMlVeCeOaYINbPyO3A has been shut down. | |
| Reason: System.IO.IOException: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. --- | |
| > System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host | |
| at System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags) | |
| at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size) | |
| --- End of inner exception stack trace --- | |
| at RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader) | |
| at RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame() | |
| at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration() | |
| at RabbitMQ.Client.Framing.Impl.Connection.MainLoop() | |
| Initiator: Library | |
| Reply Text: Unexpected Exception | |
| [13:33:12] [$5] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server. | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering exchange test.for-client: Already closed: | |
| The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitM | |
| Q.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, | |
| text="Goodbye", classId=0, methodId=0, cause= | |
| at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) | |
| at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) | |
| at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete, | |
| Boolean internal, Boolean nowait, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.RecordedExchange.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverExchanges() | |
| --- End of inner exception stack trace --- | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering exchange test.for-sever: Already closed: | |
| The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ | |
| .Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, | |
| text="Goodbye", classId=0, methodId=0, cause= | |
| at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) | |
| at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) | |
| at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete, | |
| Boolean internal, Boolean nowait, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.RecordedExchange.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverExchanges() | |
| --- End of inner exception stack trace --- | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering queue test.cleint: Already closed: The AM | |
| QP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ.Clien | |
| t.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text=" | |
| Goodbye", classId=0, methodId=0, cause= | |
| at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) | |
| at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body) | |
| at RabbitMQ.Client.Framing.Impl.Model._Private_QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, | |
| Boolean nowait, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 | |
| arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.RecordedQueue.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverQueues() | |
| --- End of inner exception stack trace --- | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering binding between test.for-client and test. | |
| cleint: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId | |
| =0, cause= ---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by | |
| Application, code=200, text="Goodbye", classId=0, methodId=0, cause= | |
| at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) | |
| at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) | |
| at RabbitMQ.Client.Framing.Impl.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.RecordedQueueBinding.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverBindings() | |
| --- End of inner exception stack trace --- | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-DmEHkMlVeCeOaYINbPyO3A | |
| on queue test.cleint: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.cleint' in | |
| vhost '/'", classId=60, methodId=20, cause= ---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP cl | |
| ose-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.cleint' in vhost '/'", classId=60, methodId=20, cause= | |
| at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) | |
| at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean noAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 a | |
| rguments, IBasicConsumer consumer) | |
| at RabbitMQ.Client.Impl.RecordedConsumer.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers() | |
| --- End of inner exception stack trace --- | |
| [13:33:20] [$14] [INFO] [ChannelFactory]: Connection has been recovered. Starting channel processing. | |
| [13:33:20] [$14] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests. | |
| [13:33:20] [$14] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed. | |
| [13:33:20] [$18] [INFO] [PublishAcknowledger]: Recieved ack for 1/2 with multiple set to 'False' | |
| [13:33:20] [$18] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 1/2 | |
| Client Message published 3 | |
| [13:33:20] [$18] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 1/2 | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering exchange test.for-sever: Already closed: | |
| The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ | |
| .Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, | |
| text="Goodbye", classId=0, methodId=0, cause= | |
| at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) | |
| at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) | |
| at RabbitMQ.Client.Framing.Impl.Model._Private_ExchangeDeclare(String exchange, String type, Boolean passive, Boolean durable, Boolean autoDelete, | |
| Boolean internal, Boolean nowait, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.ExchangeDeclare(String exchange, String type, Boolean durable, Boolean autoDelete, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.RecordedExchange.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverExchanges() | |
| --- End of inner exception stack trace --- | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering queue test.server: Already closed: The AM | |
| QP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId=0, cause= ---> RabbitMQ.Clien | |
| t.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text=" | |
| Goodbye", classId=0, methodId=0, cause= | |
| at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) | |
| at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body) | |
| at RabbitMQ.Client.Framing.Impl.Model._Private_QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, | |
| Boolean nowait, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 | |
| arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.RecordedQueue.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverQueues() | |
| --- End of inner exception stack trace --- | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering binding between test.for-sever and test.s | |
| erver: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text="Goodbye", classId=0, methodId= | |
| 0, cause= ---> RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by | |
| Application, code=200, text="Goodbye", classId=0, methodId=0, cause= | |
| at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) | |
| at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) | |
| at RabbitMQ.Client.Framing.Impl.Model._Private_QueueBind(String queue, String exchange, String routingKey, Boolean nowait, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.ModelBase.QueueBind(String queue, String exchange, String routingKey, IDictionary`2 arguments) | |
| at RabbitMQ.Client.Impl.RecordedQueueBinding.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverBindings() | |
| --- End of inner exception stack trace --- | |
| Topology recovery exception: RabbitMQ.Client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-8KbFsgKaW8CWY2yiEvoW-Q | |
| on queue test.server: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.server' in | |
| vhost '/'", classId=60, methodId=20, cause= ---> RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP cl | |
| ose-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'test.server' in vhost '/'", classId=60, methodId=20, cause= | |
| at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) | |
| at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean noAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 a | |
| rguments, IBasicConsumer consumer) | |
| at RabbitMQ.Client.Impl.RecordedConsumer.Recover() | |
| at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers() | |
| --- End of inner exception stack trace --- | |
| [13:33:21] [$4] [INFO] [ChannelFactory]: Connection has been recovered. Starting channel processing. | |
| [13:33:21] [$4] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests. | |
| [13:33:21] [$4] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed. | |
| [13:33:23] [$4] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server. | |
| [13:33:23] [$4] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests. | |
| [13:33:23] [$4] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed. | |
| [13:33:23] [$18] [INFO] [PublishAcknowledger]: Recieved ack for 2/2 with multiple set to 'False' | |
| [13:33:23] [$18] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 2/2 | |
| Client Message published 4 | |
| [13:33:23] [$18] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 2/2 | |
| [13:33:26] [$14] [DEBUG] [BaseBusClient`1]: Initiating publish for message 'String' on exchange 'test.for-sever' with routing key server. | |
| [13:33:26] [$14] [DEBUG] [ChannelFactory]: Begining to process 'GetChannel' requests. | |
| [13:33:26] [$14] [DEBUG] [ChannelFactory]: 'GetChannel' has been processed. | |
| [13:33:26] [$18] [INFO] [PublishAcknowledger]: Recieved ack for 3/2 with multiple set to 'False' | |
| [13:33:26] [$18] [DEBUG] [PublishAcknowledger]: Disposed ack timer for 3/2 | |
| Client Message published 5 | |
| [13:33:26] [$18] [DEBUG] [PublishAcknowledger]: Successfully confirmed publish 3/2 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?xml version="1.0" encoding="utf-8"?> | |
| <packages> | |
| <package id="Microsoft.Extensions.Configuration" version="1.1.2" targetFramework="net462" /> | |
| <package id="Microsoft.Extensions.Configuration.Abstractions" version="1.1.2" targetFramework="net462" /> | |
| <package id="Microsoft.Extensions.Configuration.Binder" version="1.1.2" targetFramework="net462" /> | |
| <package id="Microsoft.Extensions.DependencyInjection" version="1.1.1" targetFramework="net462" /> | |
| <package id="Microsoft.Extensions.DependencyInjection.Abstractions" version="1.1.1" targetFramework="net462" /> | |
| <package id="Microsoft.Extensions.Logging" version="1.1.2" targetFramework="net462" /> | |
| <package id="Microsoft.Extensions.Logging.Abstractions" version="1.1.2" targetFramework="net462" /> | |
| <package id="Microsoft.Extensions.Primitives" version="1.1.1" targetFramework="net462" /> | |
| <package id="Microsoft.NETCore.Platforms" version="1.1.0" targetFramework="net462" /> | |
| <package id="Microsoft.Win32.Primitives" version="4.3.0" targetFramework="net462" /> | |
| <package id="NETStandard.Library" version="1.6.1" targetFramework="net462" /> | |
| <package id="Newtonsoft.Json" version="10.0.3" targetFramework="net462" /> | |
| <package id="RabbitMQ.Client" version="4.1.3" targetFramework="net462" /> | |
| <package id="RawRabbit" version="1.10.3" targetFramework="net462" /> | |
| <package id="RawRabbit.Extensions" version="1.10.3" targetFramework="net462" /> | |
| <package id="RawRabbit.vNext" version="1.10.3" targetFramework="net462" /> | |
| <package id="System.AppContext" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Collections" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Collections.Concurrent" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Collections.NonGeneric" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.ComponentModel" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.ComponentModel.Primitives" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.ComponentModel.TypeConverter" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Console" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Diagnostics.Debug" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Diagnostics.DiagnosticSource" version="4.4.0" targetFramework="net462" /> | |
| <package id="System.Diagnostics.Tools" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Diagnostics.Tracing" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Globalization" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Globalization.Calendars" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.IO" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.IO.Compression" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.IO.Compression.ZipFile" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.IO.FileSystem" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.IO.FileSystem.Primitives" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Linq" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Linq.Expressions" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Net.Http" version="4.3.2" targetFramework="net462" /> | |
| <package id="System.Net.Primitives" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Net.Sockets" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.ObjectModel" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Reflection" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Reflection.Extensions" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Reflection.Primitives" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Resources.ResourceManager" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Runtime" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Runtime.CompilerServices.Unsafe" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Runtime.Extensions" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Runtime.Handles" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Runtime.InteropServices" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Runtime.InteropServices.RuntimeInformation" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Runtime.Numerics" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Security.Cryptography.Algorithms" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Security.Cryptography.Encoding" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Security.Cryptography.Primitives" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Security.Cryptography.X509Certificates" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Text.Encoding" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Text.Encoding.Extensions" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Text.RegularExpressions" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Threading" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Threading.Tasks" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Threading.Timer" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Xml.ReaderWriter" version="4.3.0" targetFramework="net462" /> | |
| <package id="System.Xml.XDocument" version="4.3.0" targetFramework="net462" /> | |
| </packages> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Net; | |
| using System.Net.Sockets; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| using RawRabbit.Configuration; | |
| using RawRabbit.Configuration.Exchange; | |
| using RawRabbit.vNext; | |
| using RawRabbit.vNext.Disposable; | |
| namespace TestRabbitMqRecovery | |
| { | |
| class Program | |
| { | |
| static void Main(string[] args) | |
| { | |
| MainAsync(args).GetAwaiter().GetResult(); | |
| Console.ReadLine(); | |
| } | |
| static async Task MainAsync(string[] args) | |
| { | |
| var tcpProxy = new TcpForwarderSlim(); | |
| var t = new Thread(() => | |
| tcpProxy.Start(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 19672), | |
| new IPEndPoint(IPAddress.Parse("127.0.0.1"), 5672))); | |
| t.Start(); | |
| var config = new RawRabbitConfiguration | |
| { | |
| Username = "guest", | |
| Password = "guest", | |
| Port = 19672, | |
| VirtualHost = "/", | |
| Hostnames = { "127.0.0.1" }, | |
| Queue = { AutoDelete = true, Durable = true, Exclusive = false }, | |
| RouteWithGlobalId = false, | |
| AutomaticRecovery = true, | |
| TopologyRecovery = true, | |
| }; | |
| var server = new Server(config); | |
| var client = new Client(config); | |
| // start infinite message publishing | |
| client.PublishAsync(); | |
| // wait for publish some messages | |
| await Task.Delay(TimeSpan.FromSeconds(4)); | |
| // simulate network problem | |
| tcpProxy.Close(); | |
| } | |
| } | |
| public class Client | |
| { | |
| private IBusClient busClient; | |
| private long i; | |
| public Client(RawRabbitConfiguration config) | |
| { | |
| busClient = BusClientFactory.CreateDefault(config); | |
| busClient.SubscribeAsync<string>(async (msg, context) => Console.WriteLine("Client Received: {0}", msg), | |
| cfg => cfg.WithExchange(exch => exch.WithName("test.for-client").WithType(ExchangeType.Direct)) | |
| .WithQueue(q => q.WithName("test.cleint")) | |
| .WithSubscriberId("") | |
| .WithRoutingKey("client")); | |
| } | |
| public async Task PublishAsync() | |
| { | |
| while (true) | |
| { | |
| i++; | |
| await busClient.PublishAsync($"MESSAGE {i}", configuration: cfg => cfg | |
| .WithExchange(ec => ec.WithName("test.for-sever").WithType(ExchangeType.Direct)) | |
| .WithRoutingKey("server")); | |
| Console.WriteLine("Client Message published {0}", i); | |
| await Task.Delay(3000); | |
| } | |
| } | |
| } | |
| public class Server | |
| { | |
| private IBusClient busClient; | |
| public Server(RawRabbitConfiguration config) | |
| { | |
| busClient = BusClientFactory.CreateDefault(config); | |
| busClient.SubscribeAsync<string>(async (msg, context) => Console.WriteLine("Server Received: {0}", msg), | |
| cfg => cfg.WithExchange(exch => exch.WithName("test.for-sever").WithType(ExchangeType.Direct)) | |
| .WithQueue(q => q.WithName("test.server")) | |
| .WithSubscriberId("") | |
| .WithRoutingKey("server")); | |
| } | |
| } | |
| /// <summary> | |
| /// Forwarding TCP connection, to simulate network problems | |
| /// </summary> | |
| public class TcpForwarderSlim | |
| { | |
| private readonly Socket _mainSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); | |
| private Socket _source; | |
| private readonly List<TcpForwarderSlim> _forwarders = new List<TcpForwarderSlim>(); | |
| public void Start(IPEndPoint local, IPEndPoint remote) | |
| { | |
| _mainSocket.Bind(local); | |
| _mainSocket.Listen(10); | |
| while (true) | |
| { | |
| var source = _mainSocket.Accept(); | |
| var destination = new TcpForwarderSlim(); | |
| var state = new State(source, destination._mainSocket); | |
| destination.Connect(remote, source); | |
| source.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, OnDataReceive, state); | |
| _forwarders.Add(destination); | |
| } | |
| } | |
| public void Close() | |
| { | |
| if (_forwarders.Count > 0) | |
| { | |
| foreach (var forwarder in _forwarders) | |
| { | |
| forwarder.Close(); | |
| } | |
| } | |
| else | |
| { | |
| _source.Close(); | |
| _mainSocket.Close(); | |
| } | |
| } | |
| private void Connect(EndPoint remoteEndpoint, Socket source) | |
| { | |
| var state = new State(_mainSocket, source); | |
| _mainSocket.Connect(remoteEndpoint); | |
| _mainSocket.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, OnDataReceive, state); | |
| _source = source; | |
| } | |
| private void OnDataReceive(IAsyncResult result) | |
| { | |
| var state = (State)result.AsyncState; | |
| try | |
| { | |
| var bytesRead = state.SourceSocket.EndReceive(result); | |
| if (bytesRead > 0) | |
| { | |
| state.DestinationSocket.Send(state.Buffer, bytesRead, SocketFlags.None); | |
| state.SourceSocket.BeginReceive(state.Buffer, 0, state.Buffer.Length, 0, OnDataReceive, state); | |
| } | |
| } | |
| catch | |
| { | |
| state.DestinationSocket.Close(); | |
| state.SourceSocket.Close(); | |
| } | |
| } | |
| private class State | |
| { | |
| public Socket SourceSocket { get; private set; } | |
| public Socket DestinationSocket { get; private set; } | |
| public byte[] Buffer { get; private set; } | |
| public State(Socket source, Socket destination) | |
| { | |
| SourceSocket = source; | |
| DestinationSocket = destination; | |
| Buffer = new byte[8192]; | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment