Created
May 20, 2019 11:44
-
-
Save VRiznyk/1f67e17c4f3f907cbf48409f16ff51b3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Inject val deltasManager | |
| val executor = Executors.newSingleThreadExecutor() | |
| fun init(){ | |
| executer.execute{ deltasManager.startConnection() } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| interface DeltasManager : DeltasConsumer { | |
| fun startConnection() | |
| fun stopConnection() | |
| } | |
| interface RabbitMQParametersProvider { | |
| fun getHost():String | |
| fun getUsername():String | |
| fun getVirtualHost():String | |
| fun getPassword():String | |
| fun getQueueName():String | |
| fun getExchangeName():String | |
| } | |
| class DeltasManagerImpl( | |
| private val parameters: RabbitMQParametersProvider | |
| ) : DeltasManager { | |
| private val factory = ConnectionFactory() | |
| private val gson = Gson() | |
| private val createProductPublisher: PublishSubject<ProductResponse> = PublishSubject.create() | |
| private val updateProductPublisher: PublishSubject<ProductResponse> = PublishSubject.create() | |
| private val deleteProductPublisher: PublishSubject<Long> = PublishSubject.create() | |
| private val createProductPropertyPublisher: PublishSubject<ProductPropertyResponse> = PublishSubject.create() | |
| private val updateProductPropertyPublisher: PublishSubject<ProductPropertyResponse> = PublishSubject.create() | |
| private val deleteProductPropertyPublisher: PublishSubject<Long> = PublishSubject.create() | |
| private val createCategoryPublisher: PublishSubject<CategoryResponse> = PublishSubject.create() | |
| private val updateCategoryPublisher: PublishSubject<CategoryResponse> = PublishSubject.create() | |
| private val deleteCategoryPublisher: PublishSubject<Long> = PublishSubject.create() | |
| private val createTablePublisher: PublishSubject<TableResponse> = PublishSubject.create() | |
| private val updateTablePublisher: PublishSubject<TableResponse> = PublishSubject.create() | |
| private val deleteTablePublisher: PublishSubject<Long> = PublishSubject.create() | |
| private val payHeldOrderListPublisher: PublishSubject<ListResponse<OrderResponse>> = PublishSubject.create() | |
| private val holdOrderListPublisher: PublishSubject<ListResponse<OrderResponse>> = PublishSubject.create() | |
| private val updateOrderListPublisher: PublishSubject<ListResponse<OrderResponse>> = PublishSubject.create() | |
| private val deleteOrderListPublisher: PublishSubject<List<DeleteOrderResponseDto>> = PublishSubject.create() | |
| override fun startConnection() { | |
| factory.host = parameters.getHost() | |
| factory.username = parameters.getUsername() | |
| factory.virtualHost = parameters.getVirtualHost() | |
| factory.password = parameters.getPassword() | |
| factory.isAutomaticRecoveryEnabled = false | |
| try { | |
| Timber.i("Connect to RabbitMQ: queue=${parameters.getQueueName()}, exchange=${parameters.getExchangeName()}") | |
| Timber.i("Connect to RabbitMQ") | |
| val connection = factory.newConnection() | |
| val channel = connection.createChannel() | |
| val queueDeclare = channel.queueDeclare( | |
| parameters.getQueueName(), | |
| true, | |
| false, | |
| true, | |
| null | |
| ) | |
| channel.queueBind( | |
| queueDeclare.queue, | |
| parameters.getExchangeName(), | |
| "" | |
| ) | |
| val consumer = object : DefaultConsumer(channel) { | |
| override fun handleDelivery( | |
| consumerTag: String?, | |
| envelope: Envelope?, | |
| properties: AMQP.BasicProperties?, | |
| body: ByteArray? | |
| ) { | |
| if (body == null) return | |
| val message = String(body, StandardCharsets.UTF_8) | |
| if (properties?.type == null) return | |
| Timber.i("Message received[${properties.type}]: $message") | |
| parseMessage(message, properties.type) | |
| } | |
| } | |
| channel.basicConsume(queueDeclare.queue, true, consumer) | |
| } catch (e: Exception) { | |
| e.printStackTrace() | |
| } | |
| } | |
| override fun stopConnection() { | |
| //connection.close() | |
| } | |
| override fun onCreateProduct(): Observable<ProductResponse> = createProductPublisher | |
| override fun onUpdateProduct(): Observable<ProductResponse> = updateProductPublisher | |
| override fun onDeleteProduct(): Observable<Long> = deleteProductPublisher | |
| override fun onCreateProductProperty(): Observable<ProductPropertyResponse> = createProductPropertyPublisher | |
| override fun onUpdateProductProperty(): Observable<ProductPropertyResponse> = updateProductPropertyPublisher | |
| override fun onDeleteProductProperty(): Observable<Long> = deleteProductPropertyPublisher | |
| override fun onCreateCategory(): Observable<CategoryResponse> = createCategoryPublisher | |
| override fun onUpdateCategory(): Observable<CategoryResponse> = updateCategoryPublisher | |
| override fun onDeleteCategory(): Observable<Long> = deleteCategoryPublisher | |
| override fun onCreateTable(): Observable<TableResponse> = createTablePublisher | |
| override fun onUpdateTable(): Observable<TableResponse> = updateTablePublisher | |
| override fun onDeleteTable(): Observable<Long> = deleteTablePublisher | |
| override fun onPayHeldOrderList(): Observable<ListResponse<OrderResponse>> = payHeldOrderListPublisher | |
| override fun onHoldOrderList(): Observable<ListResponse<OrderResponse>> = holdOrderListPublisher | |
| override fun onUpdateOrderList(): Observable<ListResponse<OrderResponse>> = updateOrderListPublisher | |
| override fun onDeleteOrderList(): Observable<List<DeleteOrderResponseDto>> = deleteOrderListPublisher | |
| private fun parseMessage(message: String, type: String) { | |
| val messageType = MessageType.valueOf(type) | |
| val response: Any = gson.fromJson(message, messageType.dataType) | |
| when (messageType) { | |
| MessageType.CREATE_PRODUCT -> createProductPublisher.onNext(response as ProductResponse) | |
| MessageType.UPDATE_PRODUCT -> updateProductPublisher.onNext(response as ProductResponse) | |
| MessageType.DELETE_PRODUCT -> deleteProductPublisher.onNext(response as Long) | |
| MessageType.CREATE_PRODUCT_PROPERTY -> createProductPropertyPublisher.onNext(response as ProductPropertyResponse) | |
| MessageType.UPDATE_PRODUCT_PROPERTY -> updateProductPropertyPublisher.onNext(response as ProductPropertyResponse) | |
| MessageType.DELETE_PRODUCT_PROPERTY -> deleteProductPropertyPublisher.onNext(response as Long) | |
| MessageType.CREATE_CATEGORY -> createCategoryPublisher.onNext(response as CategoryResponse) | |
| MessageType.UPDATE_CATEGORY -> updateCategoryPublisher.onNext(response as CategoryResponse) | |
| MessageType.DELETE_CATEGORY -> deleteCategoryPublisher.onNext(response as Long) | |
| MessageType.CREATE_TABLE -> createTablePublisher.onNext(response as TableResponse) | |
| MessageType.UPDATE_TABLE -> updateTablePublisher.onNext(response as TableResponse) | |
| MessageType.DELETE_TABLE -> deleteTablePublisher.onNext(response as Long) | |
| MessageType.PAY_HELD_ORDER_LIST -> payHeldOrderListPublisher.onNext(response as ListResponse<OrderResponse>) | |
| MessageType.HOLD_ORDER_LIST -> holdOrderListPublisher.onNext(response as ListResponse<OrderResponse>) | |
| MessageType.UPDATE_ORDER_LIST -> holdOrderListPublisher.onNext(response as ListResponse<OrderResponse>) | |
| MessageType.DELETE_ORDER_LIST -> deleteOrderListPublisher.onNext(response as List<DeleteOrderResponseDto>) | |
| else -> Timber.i("[$type] deltas received, but parsing is skipped") | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment