Skip to content

Instantly share code, notes, and snippets.

@halcy
Created March 14, 2025 19:44
Show Gist options
  • Select an option

  • Save halcy/b4f455ef05c4c36906107e9367b8dd63 to your computer and use it in GitHub Desktop.

Select an option

Save halcy/b4f455ef05c4c36906107e9367b8dd63 to your computer and use it in GitHub Desktop.

Revisions

  1. halcy created this gist Mar 14, 2025.
    558 changes: 558 additions & 0 deletions mastofuse.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,558 @@
    import sys, os, stat, errno, operator, time, datetime, requests
    from fuse import FUSE, Operations, LoggingMixIn, FuseOSError
    from mastodon import Mastodon
    from cachetools import TTLCache, cachedmethod
    from mastodon.return_types import MediaAttachment, Account, Status

    class PathItem:
    def __init__(self, path_type, mtime=None, size=0, symlink_target=None, read_fn=None, listdir_fn=None):
    self.path_type = path_type
    self.mtime = mtime or time.time()
    self.size = size
    self.symlink_target = symlink_target
    self._read_fn = read_fn
    self._listdir_fn = listdir_fn

    def read(self, offset, length):
    if self.path_type != "file":
    raise FuseOSError(errno.EISDIR)
    if not self._read_fn:
    return b""
    data = self._read_fn()
    return data[offset: offset + length]

    def listdir(self):
    if self.path_type != "dir":
    raise FuseOSError(errno.ENOTDIR)
    if not self._listdir_fn:
    return []
    return self._listdir_fn()


    class MastoFS(LoggingMixIn, Operations):
    def __init__(self, url, token):
    self.api = Mastodon(
    access_token=token,
    api_base_url=url,
    )
    self.mastodon_object_cache = TTLCache(maxsize=100, ttl=5)
    self.long_term_posts = TTLCache(maxsize=5000, ttl=86400)
    self.long_term_accounts = TTLCache(maxsize=5000, ttl=86400)
    self.write_buffers = {}
    self.reblog_post = None
    self.reblog_last_account = ""

    # Static directories
    self.base_files = {
    '': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {
    'posts': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {
    'new': {
    'file': dict(st_mode=(0o644 | stat.S_IFREG), st_nlink=1, st_size=0),
    'children': None,
    },
    'reblogged': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {}
    },
    },
    },
    'accounts': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {
    'me': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {}
    },
    },
    },
    'timelines': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {
    'home': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {}
    },
    'local': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {}
    },
    'federated': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {}
    },
    'public': {
    'file': dict(st_mode=(0o755 | stat.S_IFDIR), st_nlink=2),
    'children': {}
    },
    },
    },
    },
    },
    }

    @cachedmethod(operator.attrgetter('mastodon_object_cache'))
    def _get_post_cached(self, post_id):
    post = self.api.status(post_id)
    if post is not None:
    self.long_term_posts[post_id] = True
    return post

    @cachedmethod(operator.attrgetter('mastodon_object_cache'))
    def _get_account_cached(self, account_id):
    acct = self.api.account(account_id)
    if acct is not None:
    self.long_term_accounts[account_id] = True
    return acct

    @cachedmethod(operator.attrgetter('mastodon_object_cache'))
    def _get_timeline_cached(self, timeline_type):
    if timeline_type == 'home':
    return self.api.timeline_home()
    elif timeline_type == 'local':
    return self.api.timeline_local()
    elif timeline_type == 'federated':
    return self.api.timeline_public()
    elif timeline_type == 'public':
    return self.api.timeline_public(local=False)
    return []

    def _get_base_file(self, path):
    if path == '/':
    return self.base_files['']
    curr = self.base_files['']
    parts = path.strip('/').split('/')
    for p in parts:
    kids = curr.get('children', {})
    if p in kids:
    curr = kids[p]
    else:
    return None
    return curr

    def _extract_time(self, obj):
    t = None
    if isinstance(obj, dict) and "created_at" in obj:
    t = obj["created_at"]
    elif hasattr(obj, "created_at"):
    t = getattr(obj, "created_at")
    if not t:
    return time.time()
    if isinstance(t, str):
    try:
    dt = datetime.datetime.fromisoformat(t)
    return dt.timestamp()
    except Exception:
    return time.time()
    if isinstance(t, datetime.datetime):
    return t.timestamp()
    return time.time()

    def _fetch_media(self, url):
    try:
    r = requests.get(url)
    r.raise_for_status()
    return r.content
    except Exception:
    return b""

    def _list_keys(self, obj):
    if isinstance(obj, list):
    return [str(i) for i in range(len(obj))]
    if isinstance(obj, dict):
    base_keys = list(obj.keys())
    if isinstance(obj, MediaAttachment):
    if "url" in obj and "file" not in base_keys:
    base_keys.append("file")
    if "preview_url" in obj and "preview_file" not in base_keys:
    base_keys.append("preview_file")
    return base_keys
    return []

    def _get_child(self, obj, key):
    if isinstance(obj, list):
    i = int(key)
    return obj[i]
    if isinstance(obj, dict):
    if isinstance(obj, MediaAttachment):
    if key == "file" and "url" in obj:
    return self._fetch_media(obj["url"])
    if key == "preview_file" and "preview_url" in obj:
    return self._fetch_media(obj["preview_url"])
    return obj[key]
    raise KeyError("No children")

    def _traverse(self, obj, parts):
    if not parts:
    return obj
    return self._traverse(self._get_child(obj, parts[0]), parts[1:])


    def _resolve_path(self, path):
    # static base tree
    base_item = self._get_base_file(path)
    base_children = None
    if base_item is not None:
    if base_item["children"] is None:
    return PathItem("file", time.time(), 0, read_fn=lambda: b"")
    else:
    base_children = list(base_item["children"].keys())

    # Split to parts
    parts = path.strip('/').split('/')

    # dynamic/posts tree
    if parts[0] == 'posts':
    return self._resolve_posts(path, parts)

    # /accounts tree
    if parts[0] == 'accounts':
    return self._resolve_accounts(path, parts)

    # /timelines tree
    if parts[0] == 'timelines':
    return self._resolve_timelines(path, parts)

    # / specifically
    if base_children is not None:
    return PathItem("dir", time.time(), listdir_fn=lambda: base_children)

    # If we get here, it's not a valid path
    raise FuseOSError(errno.ENOENT)

    def _resolve_posts(self, path, parts):
    if len(parts) == 1:
    # /posts
    node = self._get_base_file(path)

    # Get the list of posts from the long term cache
    def list_posts():
    base = list(node['children'].keys()) if node and 'children' in node else []
    for pid in self.long_term_posts:
    if pid not in base:
    base.append(pid)
    return base
    return PathItem("dir", time.time(), listdir_fn=list_posts)

    if parts[1] == 'reblogged':
    # /posts/reblogged
    if len(parts) == 2:
    node = self._get_base_file(path)
    if node:
    return PathItem("dir", time.time(), listdir_fn=lambda: list(node.get('children', {}).keys()))
    return PathItem("dir", time.time(), listdir_fn=lambda: [])
    raise FuseOSError(errno.ENOENT)

    # /posts/<id>/...
    post_id = parts[1]
    post_obj = self._get_post_cached(post_id)
    if not post_obj:
    raise FuseOSError(errno.ENOENT)

    if len(parts) == 2:
    # Post directory
    t = self._extract_time(post_obj)
    def listdir_post():
    return self._list_keys(post_obj)
    return PathItem("dir", t, listdir_fn=listdir_post)

    # Traverse deeper into the object
    try:
    final_obj = self._traverse(post_obj, parts[2:])
    except (KeyError, IndexError):
    raise FuseOSError(errno.ENOENT)
    return self._make_item_for_obj(final_obj)

    def _resolve_accounts(self, path, parts):
    # /accounts
    if len(parts) == 1:
    # top-level directory => static + known account IDs
    node = self._get_base_file(path)
    def list_accounts():
    base = list(node['children'].keys()) if node and 'children' in node else []
    # add known from long_term
    for aid in self.long_term_accounts:
    if aid not in base:
    base.append(aid)
    return base
    return PathItem("dir", time.time(), listdir_fn=list_accounts)

    # /accounts/me
    if parts[1] == 'me':
    acct = self.api.account_verify_credentials()
    if acct:
    self.long_term_accounts[str(acct.id)] = True
    if len(parts) == 2:
    t = self._extract_time(acct)
    def list_me():
    return self._list_keys(acct)
    return PathItem("dir", t, listdir_fn=list_me)
    try:
    sub = self._traverse(acct, parts[2:])
    except (KeyError, IndexError):
    raise FuseOSError(errno.ENOENT)
    return self._make_item_for_obj(sub)

    # /accounts/<id>/...
    account_id = parts[1]
    acct_obj = self._get_account_cached(account_id)
    if not acct_obj:
    raise FuseOSError(errno.ENOENT)

    if len(parts) == 2:
    # Account directory
    t = self._extract_time(acct_obj)
    def list_acct():
    return self._list_keys(acct_obj)
    return PathItem("dir", t, listdir_fn=list_acct)

    # Traverse deeper into the object
    try:
    sub = self._traverse(acct_obj, parts[2:])
    except (KeyError, IndexError):
    raise FuseOSError(errno.ENOENT)
    return self._make_item_for_obj(sub)

    def _resolve_timelines(self, path, parts):
    # /timelines
    if len(parts) == 1:
    node = self._get_base_file(path)
    def list_top():
    return list(node['children'].keys()) if node and 'children' in node else []
    return PathItem("dir", time.time(), listdir_fn=list_top)
    if len(parts) == 2:
    node = self._get_base_file(path)
    def list_tl():
    base = list(node['children'].keys()) if node and 'children' in node else []
    posts = self._get_timeline_cached(parts[1])
    base.extend(str(i) for i in range(len(posts)))
    return base
    return PathItem("dir", time.time(), listdir_fn=list_tl)
    if len(parts) == 3:
    timeline_type = parts[1]
    idx_s = parts[2]
    try:
    idx = int(idx_s)
    except ValueError:
    raise FuseOSError(errno.ENOENT)
    posts = self._get_timeline_cached(timeline_type)
    if idx < 0 or idx >= len(posts):
    raise FuseOSError(errno.ENOENT)
    post = posts[idx]
    def symlink_target():
    return f"/posts/{post.id}"
    t = self._extract_time(post)
    return PathItem("symlink", t, symlink_target=symlink_target)
    raise FuseOSError(errno.ENOENT)

    def _make_item_for_obj(self, obj):
    # Turn the object into a PathItem
    if isinstance(obj, Account):
    def symlink_target():
    return f"/accounts/{obj.id}"
    return PathItem("symlink", self._extract_time(obj), symlink_target=symlink_target)

    if isinstance(obj, Status):
    def symlink_target():
    return f"/posts/{obj.id}"
    return PathItem("symlink", self._extract_time(obj), symlink_target=symlink_target)

    if isinstance(obj, (dict, list)):
    t = self._extract_time(obj)
    def listdir_fn():
    return self._list_keys(obj)
    return PathItem("dir", t, listdir_fn=listdir_fn)

    if isinstance(obj, bytes):
    data_bytes = obj
    size = len(data_bytes)
    t = time.time()
    return PathItem("file", t, size=size, read_fn=lambda: data_bytes)

    # If nothing else: treat as text
    data_str = str(obj).encode('utf-8')
    size = len(data_str)
    t = time.time()
    return PathItem("file", t, size=size, read_fn=lambda: data_str)

    def getattr(self, path, fh=None):
    # We special case the /posts/reblogged directory to allow basically everything without error messages
    # We drop pretty much all writes though
    if path.startswith('/posts/reblogged'):
    parts = path.split('/')
    if len(parts) <= 4 or parts[-1] in ["mentions", "media_attachments", "emojis", "tags", "filtered", "application"]:
    st = {
    'st_mode': (0o755 | stat.S_IFDIR),
    'st_nlink': 2,
    'st_size': 0,
    'st_mtime': time.time(),
    'st_atime': time.time(),
    'st_ctime': time.time()
    }
    else:
    if parts[-1] == "account":
    if path != self.reblog_last_account:
    raise FuseOSError(errno.ENOENT)
    else:
    st = {
    'st_mode': (0o755 | stat.S_IFLNK),
    'st_nlink': 1,
    'st_size': 0,
    'st_mtime': time.time(),
    'st_atime': time.time(),
    'st_ctime': time.time()
    }
    return st
    st = {
    'st_mode': (0o644 | stat.S_IFREG),
    'st_nlink': 1,
    'st_size': 0,
    'st_mtime': time.time(),
    'st_atime': time.time(),
    'st_ctime': time.time()
    }
    return st

    # Otherwise, just resolve and return correct attrs
    item = self._resolve_path(path)
    mode = 0
    if item.path_type == "dir":
    mode = (0o755 | stat.S_IFDIR)
    elif item.path_type == "file":
    mode = (0o644 | stat.S_IFREG)
    elif item.path_type == "symlink":
    mode = (0o755 | stat.S_IFLNK)

    st = {
    'st_mode': mode,
    'st_nlink': 1,
    'st_size': item.size,
    'st_mtime': item.mtime,
    'st_atime': item.mtime,
    'st_ctime': item.mtime
    }

    return st

    def readdir(self, path, fh):
    # Return the list of files in a directory
    item = self._resolve_path(path)
    if item.path_type != "dir":
    raise FuseOSError(errno.ENOTDIR)
    entries = ['.', '..']
    entries.extend(item.listdir())
    return entries

    def readlink(self, path):
    # Return the target of the symlink
    item = self._resolve_path(path)
    if item.path_type != "symlink":
    raise FuseOSError(errno.EINVAL)
    target = item.symlink_target
    if callable(target):
    target = target()
    return os.path.relpath(target, os.path.dirname(path))

    def read(self, path, size, offset, fh):
    # Return contents of the attribute (or potentially downloaded media file)
    item = self._resolve_path(path)
    if item.path_type != "file":
    raise FuseOSError(errno.EISDIR)
    return item.read(offset, size)

    def write(self, path, data, offset, fh):
    # Write to /posts/new posts the thing
    if path == '/posts/new':
    if path not in self.write_buffers:
    self.write_buffers[path] = b""
    self.write_buffers[path] += data
    return len(data)

    # Write to /posts/reblogged/<anything>/id stores the id to reblog on close
    if path.startswith('/posts/reblogged'):
    if len(path.split('/')) == 5:
    if path.split('/')[-1] == "id":
    self.reblog_buffer = data.decode('utf-8')
    return len(data)

    raise FuseOSError(errno.EROFS)

    def create(self, path, mode, fi=None):
    # required for writing to work correctly in some cases
    if path == '/posts/new':
    self.write_buffers[path] = b""
    return 0

    # Better strategy: allow creation in general, then reblog when writing an id to the appropriate place
    if path.startswith('/posts/reblogged'):
    return 0

    # no creation allowed generally
    raise FuseOSError(errno.EROFS)

    # Have to allow directory creation under /posts/reblogged
    def mkdir(self, path, mode):
    if path.startswith('/posts/reblogged'):
    return 0
    raise FuseOSError(errno.EROFS)

    # Also have to allow file deletion under /posts/reblogged
    # I'm not sure why but symlink copy breaks otherwise??
    def unlink(self, path):
    if path.startswith('/posts/reblogged'):
    return 0
    raise FuseOSError(errno.EROFS)


    def release(self, path, fh):
    # on close: post whatever is in the write buffer
    if path == '/posts/new':
    buf = self.write_buffers.get(path, b"")
    if buf:
    text = buf.decode('utf-8')
    try:
    newp = self.api.status_post(text)
    if newp:
    self.long_term_posts[str(newp.id)] = True
    except Exception:
    pass
    del self.write_buffers[path]

    # on closing a reblogged post, reblog it
    if path.startswith('/posts/reblogged/'):
    if len(path.split('/')) == 5:
    if path.split('/')[-1] == "id":
    print("CLOSING ID", self.reblog_buffer)
    try:
    self.api.status_reblog(self.reblog_buffer)
    self.reblog_buffer = None
    except Exception:
    pass
    return 0

    def truncate(self, path, length, fh=None):
    # required for writing to work correctly in many cases
    if path == '/posts/new':
    self.write_buffers[path] = b""
    return 0
    if path.startswith('/posts/reblogged'):
    return 0
    raise FuseOSError(errno.EROFS)

    def symlink(self, target, source):
    if target.startswith('/posts/reblogged'):
    self.reblog_last_account = target
    return 0
    raise FuseOSError(errno.EROFS)

    if __name__ == '__main__':
    if len(sys.argv) != 2:
    print("Usage: {} <mountpoint> <url> <token>".format(sys.argv[0]))
    sys.exit(1)
    mountpoint = sys.argv[1]
    url = sys.argv[2]
    token = sys.argv[3]
    FUSE(MastoFS(url, token), mountpoint, nothreads=True, foreground=True)