-
-
Save afreerunner/746099d112cc8c452c41b490bd7346da to your computer and use it in GitHub Desktop.
redis stream consumer #redis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import redis | |
| import time | |
| pool = redis.ConnectionPool(host='192.168.137.3', port=6379, password='', decode_responses=True) | |
| r = redis.Redis(connection_pool=pool) | |
| STREAM_NAME = "consumer" | |
| GROUP_NAME = "g1" | |
| CONSUMER_NAME = "c1" | |
| """ | |
| # 判断是否存在某个组 | |
| groups_info = r.xinfo_groups(STREAM_NAME) | |
| # print(groups_info) | |
| group_exist = False | |
| for i in groups_info: | |
| if i["name"] == GROUP_NAME: | |
| # 删除这个组新建 | |
| group_exist = True | |
| if group_exist is False: | |
| # 新建组 | |
| r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0-0") | |
| # 判断消费者是否存在 | |
| consumers_info = r.xinfo_consumers(STREAM_NAME, GROUP_NAME) | |
| consumer_exist = False | |
| for i in consumers_info: | |
| if i["name"] == CONSUMER_NAME: | |
| consumer_exist = True | |
| if consumer_exist is False: | |
| stream_datas = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: "0"}, count=1, block=0) | |
| print(stream_datas) | |
| """ | |
| # 先删除组,再创建组 | |
| r.xgroup_destroy(STREAM_NAME, GROUP_NAME) | |
| r.xgroup_create(STREAM_NAME, GROUP_NAME, "0-0") | |
| # 先遍历pending | |
| next_id = "0-0" | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: next_id}, count=1, block=0) | |
| print(data) | |
| try: | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(next_id) | |
| else: | |
| break | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: ">"}, count=1, block=0) | |
| try: | |
| # TODO | |
| time.sleep(1) | |
| pass | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(data) | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| break |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import redis | |
| import time | |
| pool = redis.ConnectionPool(host='192.168.137.3', port=6379, password='', decode_responses=True) | |
| r = redis.Redis(connection_pool=pool) | |
| STREAM_NAME = "consumer" | |
| GROUP_NAME = "g1" | |
| CONSUMER_NAME = "c2" | |
| """ | |
| # 判断是否存在某个组 | |
| groups_info = r.xinfo_groups(STREAM_NAME) | |
| # print(groups_info) | |
| group_exist = False | |
| for i in groups_info: | |
| if i["name"] == GROUP_NAME: | |
| # 删除这个组新建 | |
| group_exist = True | |
| if group_exist is False: | |
| # 新建组 | |
| r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0-0") | |
| # 判断消费者是否存在 | |
| consumers_info = r.xinfo_consumers(STREAM_NAME, GROUP_NAME) | |
| consumer_exist = False | |
| for i in consumers_info: | |
| if i["name"] == CONSUMER_NAME: | |
| consumer_exist = True | |
| if consumer_exist is False: | |
| stream_datas = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: "0"}, count=1, block=0) | |
| print(stream_datas) | |
| """ | |
| # 先遍历pending | |
| next_id = "0-0" | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: next_id}, count=1, block=0) | |
| print(data) | |
| try: | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(next_id) | |
| else: | |
| break | |
| while True: | |
| data = r.xreadgroup(GROUP_NAME, CONSUMER_NAME, {STREAM_NAME: ">"}, count=1, block=0) | |
| try: | |
| # TODO | |
| time.sleep(1) | |
| pass | |
| if data[0][1]: | |
| next_id = data[0][1][0][0] | |
| print(data) | |
| r.xack(STREAM_NAME, GROUP_NAME, next_id) | |
| except Exception as e: | |
| print(e) | |
| break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment