Last active
January 12, 2022 03:37
-
-
Save sripathikrishnan/df4d91580da60b2032916ac2a172bae4 to your computer and use it in GitHub Desktop.
Importing StackExchange Archives into Redis
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| """ | |
| Imports stackexchange xml files into redis | |
| Motivation: | |
| ----------- | |
| This script is a part of redis-rdb-tools, | |
| and helps us generate realistic dump files to help in testing | |
| Schema Documentation: | |
| --------------------- | |
| See https://meta.stackexchange.com/questions/2677/database-schema-documentation-for-the-public-data-dump-and-sede | |
| Pre-requisites: | |
| --------------- | |
| 1. Download any of the stackexchange website archive files | |
| from https://archive.org/download/stackexchange | |
| 2. Extract the .7z files into a directory. | |
| The directory should have Badges.xml, Comments.xml, PostHistory.xml, | |
| PostLinks.xml, Posts.xml, Tags.xml, Users.xml, and Votes.xml | |
| 3. Modify the path to the directory in the __main__ section of this script | |
| 4. Ensure redis is running on localhost:6379 without a password | |
| (Or, adjust the connection string in the __main__ section at the bottom) | |
| 5. Run this script without any command line arguments | |
| Keyspace Details: | |
| ----------------- | |
| After running this script, redis will have the following keys: | |
| 1. questions:<id> -> a hash containing questions details | |
| 2. questions:<id>:answers -> a set containing answer ids for this question | |
| 3. questions:<id>:tags -> a set of tags assigned to this question | |
| 4. questions:<id>:related_questions -> a set containing question ids that are related to this question | |
| answers:<id> -> a hash containing answer details | |
| 5. post:<id>:comments -> a sorted set, key = commentid, score = score of the comment | |
| this sorted set only contains comments made on this post. | |
| A post can be a question or an answer. Post Id can be question id or answer id | |
| 1. users:<id> -> a hash containing user details | |
| 14. users:<id>:comments -> a sorted set, key = commentid, score = score of the comment | |
| this sorted set only contains comments created by this user. | |
| 3. users:<id>:badges -> a set of badges assigned to this user | |
| 10. users:<id>:questions_by_score -> a sorted set, key = question id, score = score of the question | |
| this sorted set only contains questions created by this user | |
| 10. users:<id>:answers_by_score -> a sorted set, key = answer id, score = score of the answer | |
| this sorted set only contains answers created by this user | |
| 11. user:<id>:questions_by_views -> a sorted set, key = questionid, | |
| score = number of views for this question | |
| this sorted set only contains questions created by this user | |
| 2. users_by_reputation -> a sorted set, key = userid, value is reputation of user | |
| 13. comments:<id> -> a hash containing comment details | |
| 16. tags:<id> -> a hash containing tag details | |
| 7. tags:<id>:questions_by_score -> a sorted set, key = question id, | |
| score = score of the question | |
| this sorted set only contains questions that have the given tag id | |
| 8. tags:<id>:questions_by_views -> a sorted set, key = questionid, | |
| score = number of views for this post | |
| this sorted set only contains questions that have the given tag id | |
| 9. tags:<id>:recent_questions -> a trimmed list of questions under this tag, | |
| ordered by time | |
| 4. badges:<id>:users -> a set of users that have this badge | |
| 5. badges_by_popularity -> a sorted set, key = badge name, | |
| value is number of users that have this badge | |
| Implementation Details: | |
| ----------------------- | |
| 1. The script uses pipelining, but is single threaded. | |
| It should be possible to increase throughput by making it multiple threaded | |
| TODOs: | |
| ------ | |
| 1. Accept command line parameters for csv file and redis connection parameters | |
| """ | |
| import xml.etree.ElementTree as etree | |
| import redis | |
| import re | |
| try: | |
| from itertools import izip_longest | |
| except: | |
| from itertools import zip_longest as izip_longest | |
| first_cap_re = re.compile('(.)([A-Z][a-z]+)') | |
| all_cap_re = re.compile('([a-z0-9])([A-Z])') | |
| def to_snake_case(name): | |
| if name.endswith(('Id', 'ID')) or name in ('UpVotes', 'DownVotes'): | |
| return name.lower() | |
| s1 = first_cap_re.sub(r'\1_\2', name) | |
| return all_cap_re.sub(r'\1_\2', s1).lower() | |
| def parse_rows(xml_file, incl_attrs=None, excl_attrs=None): | |
| for _, elem in etree.iterparse(xml_file, events=('start', )): | |
| if elem.tag == 'row': | |
| if excl_attrs: | |
| req_keys = set(elem.attrib.keys()) - excl_attrs | |
| elif incl_attrs: | |
| req_keys = set(elem.attrib.keys()) & incl_attrs | |
| else: | |
| req_keys = elem.attrib.keys() | |
| obj = dict((to_snake_case(k), elem.attrib[k]) for k in req_keys) | |
| yield obj | |
| def get_batches(iterable, batch_size, fillvalue=None): | |
| args = [iter(iterable)] * batch_size | |
| return izip_longest(fillvalue=fillvalue, *args) | |
| def import_in_batches(red, objs, callback): | |
| for batch in get_batches(objs, 100): | |
| pipe = red.pipeline() | |
| for obj in batch: | |
| if obj: | |
| callback(obj, pipe) | |
| pipe.execute() | |
| def _import_user(user, pipe): | |
| pipe.hmset("users:%s" % user['id'], user) | |
| pipe.zadd("users_by_reputation", user['reputation'], user['id']) | |
| def import_users(users_file, red): | |
| users = parse_rows(users_file) | |
| import_in_batches(red, users, _import_user) | |
| def _import_user_badge(badge, pipe): | |
| pipe.sadd("users:%s:badges" % badge['userid'], badge['name']) | |
| pipe.sadd("badges:%s:users" % badge['name'], badge['userid']) | |
| pipe.zincrby("badges_by_popularity", badge['name'], 1.0) | |
| def import_user_badges(badges_file, red): | |
| badges = parse_rows(badges_file, incl_attrs=set(['UserId', 'Name'])) | |
| import_in_batches(red, badges, _import_user_badge) | |
| def _parse_tags(tag_str): | |
| tag_str = tag_str.replace("><", ",") | |
| tag_str = tag_str.replace(">", "") | |
| tag_str = tag_str.replace("<", "") | |
| if tag_str: | |
| tags = tag_str.split(",") | |
| return tags | |
| def _post_type(posttypeid): | |
| posttypeid = int(posttypeid.strip()) | |
| if posttypeid == 1: | |
| post_type = "questions" | |
| elif posttypeid == 2: | |
| post_type = "answers" | |
| else: | |
| post_type = None | |
| # elif posttypeid in (3,4,5): | |
| # post_type = "tagwiki" | |
| # elif posttypeid == 6: | |
| # post_type = "moderator-nomination" | |
| # elif posttypeid in (7,8): | |
| # post_type = "wiki" | |
| # else: | |
| # post_type = "generic-post" | |
| return post_type | |
| def _import_post(post, pipe): | |
| post_type = _post_type(post['posttypeid']) | |
| # Only import questions and answers, ignore tag wiki | |
| if not post_type: | |
| return | |
| if 'tags' in post: | |
| tags = _parse_tags(post['tags']) | |
| del post['tags'] | |
| if tags and post_type == 'questions': | |
| pipe.sadd("questions:%s:tags" % post['id'], *tags) | |
| for tag in tags: | |
| pipe.zadd("tags:%s:questions_by_score" % tag, post['score'], post['id']) | |
| if 'view_count' in post: | |
| pipe.zadd("tags:%s:questions_by_views" % tag, post['view_count'], post['id']) | |
| pipe.lpush("tags:%s:recent_questions" % tag, post['id']) | |
| pipe.ltrim("tags:%s:recent_questions" % tag, 0, 200) | |
| if 'owneruserid' in post: | |
| pipe.zadd("users:%s:%s_by_score" % (post['owneruserid'], post_type), post['score'], post['id']) | |
| if 'view_count' in post and post_type == 'questions': | |
| pipe.zadd("users:%s:questions_by_views" % post['owneruserid'], post['view_count'], post['id']) | |
| # store questions / answers in separate key space | |
| pipe.hmset("%s:%s" % (post_type, post['id']), post) | |
| if post_type == 'answers' and 'parentid' in post: | |
| pipe.sadd("questions:%s:answers" % post['parentid'], post['id']) | |
| def import_posts(posts_file, red): | |
| posts = parse_rows(posts_file) | |
| import_in_batches(red, posts, _import_post) | |
| def _import_comment(comment, pipe): | |
| pipe.hmset("comments:%s" % comment['id'], comment) | |
| if 'userid' in comment: | |
| pipe.zadd("users:%s:comments" % comment['userid'], comment['score'], comment['id']) | |
| pipe.rpush("posts:%s:comments" % comment['postid'], comment['id']) | |
| def import_comments(comments_file, red): | |
| comments = parse_rows(comments_file) | |
| import_in_batches(red, comments, _import_comment) | |
| def _import_tag(tag, pipe): | |
| pipe.hmset("tags:%s" % tag['tag_name'], tag) | |
| def import_tags(tags_file, red): | |
| tags = parse_rows(tags_file) | |
| import_in_batches(red, tags, _import_tag) | |
| def _import_linked_post(linked_post, pipe): | |
| # Asssuming only questions are linked | |
| # technically wiki's and other posts can also be linked, | |
| # but we will ignore those for now | |
| pipe.sadd("questions:%s:linked_questions" % linked_post['postid'], linked_post['relatedpostid']) | |
| pipe.sadd("questions:%s:linked_questions" % linked_post['relatedpostid'], linked_post['postid']) | |
| def import_linked_posts(linked_posts_file, red): | |
| linked_posts = parse_rows(linked_posts_file) | |
| import_in_batches(red, linked_posts, _import_linked_post) | |
| if __name__ == '__main__': | |
| _red = redis.StrictRedis() | |
| base_dir = "datascience.stackexchange.com" | |
| def _f(filename): | |
| return base_dir + "/" + filename | |
| import_users(_f("Users.xml"), _red) | |
| import_user_badges(_f("Badges.xml"), _red) | |
| import_posts(_f("Posts.xml"), _red) | |
| import_comments(_f("Comments.xml"), _red) | |
| import_tags(_f("Tags.xml"), _red) | |
| import_linked_posts(_f("PostLinks.xml"), _red) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment