Skip to content

Instantly share code, notes, and snippets.

@VRiznyk
Created May 20, 2019 11:44
Show Gist options
  • Select an option

  • Save VRiznyk/1f67e17c4f3f907cbf48409f16ff51b3 to your computer and use it in GitHub Desktop.

Select an option

Save VRiznyk/1f67e17c4f3f907cbf48409f16ff51b3 to your computer and use it in GitHub Desktop.
@Inject val deltasManager
val executor = Executors.newSingleThreadExecutor()
fun init(){
executer.execute{ deltasManager.startConnection() }
}
interface DeltasManager : DeltasConsumer {
fun startConnection()
fun stopConnection()
}
interface RabbitMQParametersProvider {
fun getHost():String
fun getUsername():String
fun getVirtualHost():String
fun getPassword():String
fun getQueueName():String
fun getExchangeName():String
}
class DeltasManagerImpl(
private val parameters: RabbitMQParametersProvider
) : DeltasManager {
private val factory = ConnectionFactory()
private val gson = Gson()
private val createProductPublisher: PublishSubject<ProductResponse> = PublishSubject.create()
private val updateProductPublisher: PublishSubject<ProductResponse> = PublishSubject.create()
private val deleteProductPublisher: PublishSubject<Long> = PublishSubject.create()
private val createProductPropertyPublisher: PublishSubject<ProductPropertyResponse> = PublishSubject.create()
private val updateProductPropertyPublisher: PublishSubject<ProductPropertyResponse> = PublishSubject.create()
private val deleteProductPropertyPublisher: PublishSubject<Long> = PublishSubject.create()
private val createCategoryPublisher: PublishSubject<CategoryResponse> = PublishSubject.create()
private val updateCategoryPublisher: PublishSubject<CategoryResponse> = PublishSubject.create()
private val deleteCategoryPublisher: PublishSubject<Long> = PublishSubject.create()
private val createTablePublisher: PublishSubject<TableResponse> = PublishSubject.create()
private val updateTablePublisher: PublishSubject<TableResponse> = PublishSubject.create()
private val deleteTablePublisher: PublishSubject<Long> = PublishSubject.create()
private val payHeldOrderListPublisher: PublishSubject<ListResponse<OrderResponse>> = PublishSubject.create()
private val holdOrderListPublisher: PublishSubject<ListResponse<OrderResponse>> = PublishSubject.create()
private val updateOrderListPublisher: PublishSubject<ListResponse<OrderResponse>> = PublishSubject.create()
private val deleteOrderListPublisher: PublishSubject<List<DeleteOrderResponseDto>> = PublishSubject.create()
override fun startConnection() {
factory.host = parameters.getHost()
factory.username = parameters.getUsername()
factory.virtualHost = parameters.getVirtualHost()
factory.password = parameters.getPassword()
factory.isAutomaticRecoveryEnabled = false
try {
Timber.i("Connect to RabbitMQ: queue=${parameters.getQueueName()}, exchange=${parameters.getExchangeName()}")
Timber.i("Connect to RabbitMQ")
val connection = factory.newConnection()
val channel = connection.createChannel()
val queueDeclare = channel.queueDeclare(
parameters.getQueueName(),
true,
false,
true,
null
)
channel.queueBind(
queueDeclare.queue,
parameters.getExchangeName(),
""
)
val consumer = object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
if (body == null) return
val message = String(body, StandardCharsets.UTF_8)
if (properties?.type == null) return
Timber.i("Message received[${properties.type}]: $message")
parseMessage(message, properties.type)
}
}
channel.basicConsume(queueDeclare.queue, true, consumer)
} catch (e: Exception) {
e.printStackTrace()
}
}
override fun stopConnection() {
//connection.close()
}
override fun onCreateProduct(): Observable<ProductResponse> = createProductPublisher
override fun onUpdateProduct(): Observable<ProductResponse> = updateProductPublisher
override fun onDeleteProduct(): Observable<Long> = deleteProductPublisher
override fun onCreateProductProperty(): Observable<ProductPropertyResponse> = createProductPropertyPublisher
override fun onUpdateProductProperty(): Observable<ProductPropertyResponse> = updateProductPropertyPublisher
override fun onDeleteProductProperty(): Observable<Long> = deleteProductPropertyPublisher
override fun onCreateCategory(): Observable<CategoryResponse> = createCategoryPublisher
override fun onUpdateCategory(): Observable<CategoryResponse> = updateCategoryPublisher
override fun onDeleteCategory(): Observable<Long> = deleteCategoryPublisher
override fun onCreateTable(): Observable<TableResponse> = createTablePublisher
override fun onUpdateTable(): Observable<TableResponse> = updateTablePublisher
override fun onDeleteTable(): Observable<Long> = deleteTablePublisher
override fun onPayHeldOrderList(): Observable<ListResponse<OrderResponse>> = payHeldOrderListPublisher
override fun onHoldOrderList(): Observable<ListResponse<OrderResponse>> = holdOrderListPublisher
override fun onUpdateOrderList(): Observable<ListResponse<OrderResponse>> = updateOrderListPublisher
override fun onDeleteOrderList(): Observable<List<DeleteOrderResponseDto>> = deleteOrderListPublisher
private fun parseMessage(message: String, type: String) {
val messageType = MessageType.valueOf(type)
val response: Any = gson.fromJson(message, messageType.dataType)
when (messageType) {
MessageType.CREATE_PRODUCT -> createProductPublisher.onNext(response as ProductResponse)
MessageType.UPDATE_PRODUCT -> updateProductPublisher.onNext(response as ProductResponse)
MessageType.DELETE_PRODUCT -> deleteProductPublisher.onNext(response as Long)
MessageType.CREATE_PRODUCT_PROPERTY -> createProductPropertyPublisher.onNext(response as ProductPropertyResponse)
MessageType.UPDATE_PRODUCT_PROPERTY -> updateProductPropertyPublisher.onNext(response as ProductPropertyResponse)
MessageType.DELETE_PRODUCT_PROPERTY -> deleteProductPropertyPublisher.onNext(response as Long)
MessageType.CREATE_CATEGORY -> createCategoryPublisher.onNext(response as CategoryResponse)
MessageType.UPDATE_CATEGORY -> updateCategoryPublisher.onNext(response as CategoryResponse)
MessageType.DELETE_CATEGORY -> deleteCategoryPublisher.onNext(response as Long)
MessageType.CREATE_TABLE -> createTablePublisher.onNext(response as TableResponse)
MessageType.UPDATE_TABLE -> updateTablePublisher.onNext(response as TableResponse)
MessageType.DELETE_TABLE -> deleteTablePublisher.onNext(response as Long)
MessageType.PAY_HELD_ORDER_LIST -> payHeldOrderListPublisher.onNext(response as ListResponse<OrderResponse>)
MessageType.HOLD_ORDER_LIST -> holdOrderListPublisher.onNext(response as ListResponse<OrderResponse>)
MessageType.UPDATE_ORDER_LIST -> holdOrderListPublisher.onNext(response as ListResponse<OrderResponse>)
MessageType.DELETE_ORDER_LIST -> deleteOrderListPublisher.onNext(response as List<DeleteOrderResponseDto>)
else -> Timber.i("[$type] deltas received, but parsing is skipped")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment