Skip to content

Instantly share code, notes, and snippets.

@Foucl
Created October 11, 2016 11:24
Show Gist options
  • Select an option

  • Save Foucl/b970260dcec6cf32ac2017a45ddded6e to your computer and use it in GitHub Desktop.

Select an option

Save Foucl/b970260dcec6cf32ac2017a45ddded6e to your computer and use it in GitHub Desktop.
FACET_LSL.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {
"ExecuteTime": {
"start_time": "2016-10-11T12:42:37.560000",
"end_time": "2016-10-11T12:42:37.563000"
}
},
"cell_type": "markdown",
"source": "# Module: FACET->LSL (Data)\n\n## Main Tasks / Main Class\n- **receives** data from FACET **Event Forwarding** API\n- **lables** this data (channel names) **sanitizes** it (conversion to floats/ints, use of nans)\n- **creates** a `pylsl.StreamOutlet` and **pushes** each (labled) sample into LSL\n\n### Secondary / Helper Functions (in or outside main class?)\n- **Controls** FACET studies (start, stop, cancel, participant management...)\n- **Checks** for sanity (FACET running, ports available ...)"
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2016-10-11T12:59:11.548000",
"end_time": "2016-10-11T12:59:11.567000"
},
"collapsed": false,
"code_folding": [
0
],
"trusted": true
},
"cell_type": "code",
"source": "# Setup cell (imports and logging), to be hidden\n\n# general modules\nimport time\nimport os\nimport sys\nimport math\nimport logging.config\nimport yaml\n\n# network and threading\nimport threading\nimport socket\nimport struct\nimport Queue\n\n# windows / system specific stuff\nimport ctypes\nimport psutil\n\nimport numpy as np\nimport cv2\n\nfrom IPython.display import clear_output\nfrom matplotlib import pyplot as plt\n\n%matplotlib inline\n%load_ext ipyext.writeandexecute\n\n# setup logging\n\nlogging.config.dictConfig(yaml.load(open('./logging_config.yaml', 'r')))\nlogger = logging.getLogger()",
"execution_count": 4,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "The ipyext.writeandexecute extension is already loaded. To reload it, use:\n %reload_ext ipyext.writeandexecute\n"
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Implementation: A Client Thread that CONNECTS to the UDP port and RECEIVES samples\n\n- taken from [this blogpost](http://eli.thegreenplace.net/2011/05/18/code-sample-socket-client-thread-in-python/)\n- is that flexible enough to be used for all three FACET connections?\n- Modifications necessary\n\n### Connection Types:\n- Event **Forwarding** (data stream): UDP / 9999:\n```\nsock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)\nsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)\nsock.bind(('', 9999))\nsock.settimeout(8)\n```\n- **Commands**: TCP / 8087\n```\ns=socket.socket(socket.AF_INET, socket.SOCK_STREAM)\ns.connect((facet_ip,rc_port))\ns.send(string_for_iMotions)\ndata = s.recv(8192)\ns.close()\n```\n- Event **Receiving** (Markers): TCP / 8089 (other class/module if necessary)\n```\ns=socket.socket(socket.AF_INET, socket.SOCK_STREAM)\ns.connect((facet_ip, event_port))\ns.send(string_for_iMotions)\n```\n\n### Data Types:\n- ClientCommand and ClientReply (see below)\n- FACET-Datastream\n- other iMotions Event types (Markers, Calibration, ...)"
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2016-10-11T13:13:05.915000",
"end_time": "2016-10-11T13:13:05.924000"
},
"collapsed": true,
"trusted": true
},
"cell_type": "code",
"source": "# Classes for communicating with the client thread\n# taken mostly from http://eli.thegreenplace.net/2011/05/18/code-sample-socket-client-thread-in-python/\n\nclass ClientCommand(object):\n \"\"\" A command to the client thread.\n Each command type has its associated data:\n\n CONNECT: (host, port) tuple\n SEND: Data string <- not needed? (commands and markers address other ports)\n RECEIVE: None\n CLOSE: None\n \"\"\"\n CONNECT, SEND, RECEIVE, CLOSE = range(4)\n\n def __init__(self, type, data=None):\n self.type = type\n self.data = data\n\n\nclass ClientReply(object):\n \"\"\" A reply from the client thread.\n Each reply type has its associated data:\n\n ERROR: The error string\n SUCCESS: Depends on the command - for RECEIVE it's the received\n data string, for others None.\n \"\"\"\n ERROR, SUCCESS = range(2)\n\n def __init__(self, type, data=None):\n self.type = type\n self.data = data\n ",
"execution_count": 8,
"outputs": []
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2016-10-11T13:07:31.418000",
"end_time": "2016-10-11T13:07:31.477000"
},
"collapsed": true,
"trusted": true
},
"cell_type": "code",
"source": "# Main Client Class\n# taken mostly from http://eli.thegreenplace.net/2011/05/18/code-sample-socket-client-thread-in-python/\n\nclass SocketClientThread(threading.Thread):\n \"\"\" Implements the threading.Thread interface (start, join, etc.) and\n can be controlled via the cmd_q Queue attribute. Replies are\n placed in the reply_q Queue attribute.\n \"\"\"\n def __init__(self, cmd_q=None, reply_q=None):\n super(SocketClientThread, self).__init__()\n self.cmd_q = cmd_q or Queue.Queue()\n self.reply_q = reply_q or Queue.Queue()\n self.alive = threading.Event()\n self.alive.set()\n self.socket = None\n\n self.handlers = {\n ClientCommand.CONNECT: self._handle_CONNECT,\n ClientCommand.CLOSE: self._handle_CLOSE,\n ClientCommand.SEND: self._handle_SEND,\n ClientCommand.RECEIVE: self._handle_RECEIVE,\n }\n\n def run(self):\n while self.alive.isSet():\n try:\n # Queue.get with timeout to allow checking self.alive\n cmd = self.cmd_q.get(True, 0.1)\n self.handlers[cmd.type](cmd)\n except Queue.Empty as e:\n continue\n\n def join(self, timeout=None):\n self.alive.clear()\n threading.Thread.join(self, timeout)\n\n def _handle_CONNECT(self, cmd):\n try:\n self.socket = socket.socket(\n socket.AF_INET, socket.SOCK_STREAM)\n self.socket.connect((cmd.data[0], cmd.data[1]))\n self.reply_q.put(self._success_reply())\n except IOError as e:\n self.reply_q.put(self._error_reply(str(e)))\n\n def _handle_CLOSE(self, cmd):\n self.socket.close()\n reply = ClientReply(ClientReply.SUCCESS)\n self.reply_q.put(reply)\n\n def _handle_SEND(self, cmd):\n header = struct.pack('<L', len(cmd.data))\n try:\n self.socket.sendall(header + cmd.data)\n self.reply_q.put(self._success_reply())\n except IOError as e:\n self.reply_q.put(self._error_reply(str(e)))\n\n def _handle_RECEIVE(self, cmd):\n try:\n header_data = self._recv_n_bytes(4)\n if len(header_data) == 4:\n msg_len = struct.unpack('<L', header_data)[0]\n data = self._recv_n_bytes(msg_len)\n if len(data) == msg_len:\n self.reply_q.put(self._success_reply(data))\n return\n self.reply_q.put(self._error_reply('Socket closed prematurely'))\n except IOError as e:\n self.reply_q.put(self._error_reply(str(e)))\n\n def _recv_n_bytes(self, n):\n \"\"\" Convenience method for receiving exactly n bytes from\n self.socket (assuming it's open and connected).\n \"\"\"\n data = ''\n while len(data) < n:\n chunk = self.socket.recv(n - len(data))\n if chunk == '':\n break\n data += chunk\n return data\n\n def _error_reply(self, errstr):\n return ClientReply(ClientReply.ERROR, errstr)\n\n def _success_reply(self, data=None):\n return ClientReply(ClientReply.SUCCESS, data)",
"execution_count": 7,
"outputs": []
}
],
"metadata": {
"kernelspec": {
"name": "Python [Root]",
"display_name": "Python [Root]",
"language": "python"
},
"toc": {
"threshold": 6,
"number_sections": false,
"toc_cell": false,
"toc_window_display": false,
"toc_section_display": "block",
"sideBar": true,
"navigate_menu": true
},
"nav_menu": {},
"language_info": {
"mimetype": "text/x-python",
"nbconvert_exporter": "python",
"name": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12",
"file_extension": ".py",
"codemirror_mode": {
"version": 2,
"name": "ipython"
}
},
"anaconda-cloud": {},
"gist": {
"id": "",
"data": {
"description": "FACET_LSL.ipynb",
"public": true
}
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment