scripts/comment_fs/comments_fs2.py

118 lines
3.8 KiB
Python

#!/usr/bin/env python3
import os
import stat
import errno
import fuse
from time import time
import json
from collections import defaultdict
fuse.fuse_python_api = (0, 2)
class MyStat(fuse.Stat):
def __init__(self):
self.st_mode = stat.S_IFDIR | 0o755
self.st_ino = 0
self.st_dev = 0
self.st_nlink = 2
self.st_uid = 0
self.st_gid = 0
self.st_size = 4096
self.st_atime = 0
self.st_mtime = 0
self.st_ctime = 0
class CommentFS(fuse.Fuse):
def __init__(self, *args, **kw):
fuse.Fuse.__init__(self, *args, **kw)
with open('comments.jsonl', 'r', encoding='utf-8') as f:
self.comments = [json.loads(line) for line in f]
self.tree = self.build_comment_tree(self.comments)
self.files = {}
self.directories = set()
self.build_file_structure()
def build_comment_tree(self, comments):
tree = defaultdict(list)
for comment in comments:
parent = comment['parent'] if comment['parent'] != 'root' else ''
tree[parent].append(comment)
return tree
def build_file_structure(self):
def add_comment(comment, path):
comment_path = os.path.join(path, comment['id'])
self.files[comment_path] = comment
if comment['id'] in self.tree:
self.directories.add(comment_path)
parent_file_path = os.path.join(comment_path, 'parent')
self.files[parent_file_path] = comment
for reply in self.tree[comment['id']]:
add_comment(reply, comment_path)
for comment in self.tree['']:
add_comment(comment, '/')
def getattr(self, path):
st = MyStat()
st.st_atime = int(time())
st.st_mtime = st.st_atime
st.st_ctime = st.st_atime
if path == '/' or path in self.directories:
st.st_mode = stat.S_IFDIR | 0o755
return st
elif path in self.files:
st.st_mode = stat.S_IFREG | 0o444
st.st_nlink = 1
content = f"ID: {self.files[path]['id']}\nText: {self.files[path]['text']}\nParent: {self.files[path]['parent']}\n"
st.st_size = len(content.encode('utf-8'))
return st
else:
return -errno.ENOENT
def readdir(self, path, offset):
dirents = ['.', '..']
if path == '/':
dirents.extend(comment['id'] for comment in self.tree[''])
elif path in self.directories:
dirents.append('parent')
dirents.extend(reply['id'] for reply in self.tree[path.split('/')[-1]])
for r in dirents:
yield fuse.Direntry(r)
def open(self, path, flags):
if path not in self.files:
return -errno.ENOENT
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
if (flags & accmode) != os.O_RDONLY:
return -errno.EACCES
return 0
def read(self, path, size, offset):
if path not in self.files:
return -errno.ENOENT
comment = self.files[path]
content = f"ID: {comment['id']}\nText: {comment['text']}\nParent: {comment['parent']}\n"
return content.encode('utf-8')[offset:offset+size]
def main():
usage = "YouTubeCommentFS: A filesystem to browse YouTube comments"
server = CommentFS(version="%prog " + fuse.__version__, usage=usage, dash_s_do='setsingle')
server.parser.add_option(mountopt="uid", metavar="UID", default=os.getuid(),
help="Set the owner of the mounted filesystem")
server.parser.add_option(mountopt="gid", metavar="GID", default=os.getgid(),
help="Set the group of the mounted filesystem")
server.multithreaded = False
server.allow_other = True
server.parse(errex=1)
server.main()
if __name__ == '__main__':
main()