# -*- coding: utf-8 -*- """ orders_source ~~~~~~~~~~~~~ Puts coinbase orders into Concord """ import time import json from concord.computation import ( Computation, Metadata, serve_computation ) import logging from Queue import Queue from twisted.internet import reactor from threading import Thread from autobahn.twisted.websocket import (WebSocketClientFactory, WebSocketClientProtocol, connectWS) logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) orders_queue = Queue() def time_milli(): return int(round(time.time()) * 1000) class ClientProtocol(WebSocketClientProtocol): def onOpen(self): logger.info("websocket opened") self.sendMessage(json.dumps({ "type": "subscribe", "product_id": "BTC-USD" })) def onMessage(self, payload, *args, **kwargs): orders_queue.put(payload) def onClose(self, wasClean, code, reason): logger.info("websocket closed because", reason) reactor.stop() class OrdersSource(Computation): def __init__(self): logger.info("__init__") def drain_queue(self, ctx): while True: data = orders_queue.get() order = None if data is None: orders_queue.task_done() try: order = json.loads(data) except: logger.info("error json decoding %s" % data) orders_queue.task_done() if order: order_id = order['order_id'] ctx.produce_record('orders', order_id, data) logger.info("produced order %s" % data) orders_queue.task_done() def process_timer(self, ctx, key, timer): self.drain_queue(ctx) def init(self, ctx): ctx.set_timer('consume-orders', time_milli() + 100) logger.info("Source initialized") def metadata(self): return Metadata( name='coinbase-orders-source', istreams=[], ostreams=['coinbase-orders']) def main(): factory = WebSocketClientFactory("wss://ws-feed.exchange.coinbase.com") factory.protocol = ClientProtocol connectWS(factory) Thread(target=reactor.run, args=(False,)).start() logger.info("Main") serve_computation(OrdersSource()) main()