experiemental: youtube comment fs
This commit is contained in:
parent
e547a58026
commit
0a9617626c
|
@ -0,0 +1,104 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import stat
|
||||
import errno
|
||||
import fuse
|
||||
from time import time
|
||||
import json
|
||||
from collections import defaultdict
|
||||
|
||||
fuse.fuse_python_api = (0, 2)
|
||||
|
||||
class MyStat(fuse.Stat):
|
||||
def __init__(self):
|
||||
self.st_mode = stat.S_IFDIR | 0o755
|
||||
self.st_ino = 0
|
||||
self.st_dev = 0
|
||||
self.st_nlink = 2
|
||||
self.st_uid = 0
|
||||
self.st_gid = 0
|
||||
self.st_size = 4096
|
||||
self.st_atime = 0
|
||||
self.st_mtime = 0
|
||||
self.st_ctime = 0
|
||||
|
||||
class CommentFS(fuse.Fuse):
|
||||
def __init__(self, *args, **kw):
|
||||
fuse.Fuse.__init__(self, *args, **kw)
|
||||
|
||||
with open('comments.jsonl', 'r', encoding='utf-8') as f:
|
||||
self.comments = [json.loads(line) for line in f]
|
||||
|
||||
self.tree = self.build_comment_tree(self.comments)
|
||||
self.files = {}
|
||||
self.build_file_structure()
|
||||
|
||||
def build_comment_tree(self, comments):
|
||||
tree = defaultdict(list)
|
||||
for comment in comments:
|
||||
parent = comment['parent'] if comment['parent'] != 'root' else ''
|
||||
tree[parent].append(comment)
|
||||
return tree
|
||||
|
||||
def build_file_structure(self):
|
||||
def add_comment(comment, path):
|
||||
comment_path = os.path.join(path, comment['id'])
|
||||
self.files[comment_path] = comment
|
||||
for reply in self.tree.get(comment['id'], []):
|
||||
add_comment(reply, comment_path)
|
||||
|
||||
for comment in self.tree['']:
|
||||
add_comment(comment, '/')
|
||||
|
||||
def getattr(self, path):
|
||||
st = MyStat()
|
||||
st.st_atime = int(time())
|
||||
st.st_mtime = st.st_atime
|
||||
st.st_ctime = st.st_atime
|
||||
|
||||
if path == '/':
|
||||
return st
|
||||
elif path in self.files:
|
||||
st.st_mode = stat.S_IFREG | 0o444
|
||||
st.st_nlink = 1
|
||||
content = f"ID: {self.files[path]['id']}\nText: {self.files[path]['text']}\nParent: {self.files[path]['parent']}\n"
|
||||
st.st_size = len(content.encode('utf-8'))
|
||||
return st
|
||||
else:
|
||||
return -errno.ENOENT
|
||||
|
||||
def readdir(self, path, offset):
|
||||
dirents = ['.', '..']
|
||||
if path == '/':
|
||||
dirents.extend(comment['id'] for comment in self.tree[''])
|
||||
elif path[1:] in [comment['id'] for comment in self.comments]:
|
||||
dirents.extend(reply['id'] for reply in self.tree.get(path[1:], []))
|
||||
|
||||
for r in dirents:
|
||||
yield fuse.Direntry(r)
|
||||
|
||||
def open(self, path, flags):
|
||||
if path not in self.files:
|
||||
return -errno.ENOENT
|
||||
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
|
||||
if (flags & accmode) != os.O_RDONLY:
|
||||
return -errno.EACCES
|
||||
return 0
|
||||
|
||||
def read(self, path, size, offset):
|
||||
if path not in self.files:
|
||||
return -errno.ENOENT
|
||||
comment = self.files[path]
|
||||
content = f"ID: {comment['id']}\nText: {comment['text']}\nParent: {comment['parent']}\n"
|
||||
return content.encode('utf-8')[offset:offset+size]
|
||||
|
||||
def main():
|
||||
usage = "YouTubeCommentFS: A filesystem to browse YouTube comments"
|
||||
server = CommentFS(version="%prog " + fuse.__version__, usage=usage, dash_s_do='setsingle')
|
||||
server.parse(errex=1)
|
||||
server.main()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
@ -0,0 +1,117 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import os
|
||||
import stat
|
||||
import errno
|
||||
import fuse
|
||||
from time import time
|
||||
import json
|
||||
from collections import defaultdict
|
||||
|
||||
fuse.fuse_python_api = (0, 2)
|
||||
|
||||
class MyStat(fuse.Stat):
|
||||
def __init__(self):
|
||||
self.st_mode = stat.S_IFDIR | 0o755
|
||||
self.st_ino = 0
|
||||
self.st_dev = 0
|
||||
self.st_nlink = 2
|
||||
self.st_uid = 0
|
||||
self.st_gid = 0
|
||||
self.st_size = 4096
|
||||
self.st_atime = 0
|
||||
self.st_mtime = 0
|
||||
self.st_ctime = 0
|
||||
|
||||
class CommentFS(fuse.Fuse):
|
||||
def __init__(self, *args, **kw):
|
||||
fuse.Fuse.__init__(self, *args, **kw)
|
||||
|
||||
with open('comments.jsonl', 'r', encoding='utf-8') as f:
|
||||
self.comments = [json.loads(line) for line in f]
|
||||
|
||||
self.tree = self.build_comment_tree(self.comments)
|
||||
self.files = {}
|
||||
self.directories = set()
|
||||
self.build_file_structure()
|
||||
|
||||
def build_comment_tree(self, comments):
|
||||
tree = defaultdict(list)
|
||||
for comment in comments:
|
||||
parent = comment['parent'] if comment['parent'] != 'root' else ''
|
||||
tree[parent].append(comment)
|
||||
return tree
|
||||
|
||||
def build_file_structure(self):
|
||||
def add_comment(comment, path):
|
||||
comment_path = os.path.join(path, comment['id'])
|
||||
self.files[comment_path] = comment
|
||||
if comment['id'] in self.tree:
|
||||
self.directories.add(comment_path)
|
||||
parent_file_path = os.path.join(comment_path, 'parent')
|
||||
self.files[parent_file_path] = comment
|
||||
for reply in self.tree[comment['id']]:
|
||||
add_comment(reply, comment_path)
|
||||
|
||||
for comment in self.tree['']:
|
||||
add_comment(comment, '/')
|
||||
|
||||
def getattr(self, path):
|
||||
st = MyStat()
|
||||
st.st_atime = int(time())
|
||||
st.st_mtime = st.st_atime
|
||||
st.st_ctime = st.st_atime
|
||||
|
||||
if path == '/' or path in self.directories:
|
||||
st.st_mode = stat.S_IFDIR | 0o755
|
||||
return st
|
||||
elif path in self.files:
|
||||
st.st_mode = stat.S_IFREG | 0o444
|
||||
st.st_nlink = 1
|
||||
content = f"ID: {self.files[path]['id']}\nText: {self.files[path]['text']}\nParent: {self.files[path]['parent']}\n"
|
||||
st.st_size = len(content.encode('utf-8'))
|
||||
return st
|
||||
else:
|
||||
return -errno.ENOENT
|
||||
|
||||
def readdir(self, path, offset):
|
||||
dirents = ['.', '..']
|
||||
if path == '/':
|
||||
dirents.extend(comment['id'] for comment in self.tree[''])
|
||||
elif path in self.directories:
|
||||
dirents.append('parent')
|
||||
dirents.extend(reply['id'] for reply in self.tree[path.split('/')[-1]])
|
||||
|
||||
for r in dirents:
|
||||
yield fuse.Direntry(r)
|
||||
|
||||
def open(self, path, flags):
|
||||
if path not in self.files:
|
||||
return -errno.ENOENT
|
||||
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
|
||||
if (flags & accmode) != os.O_RDONLY:
|
||||
return -errno.EACCES
|
||||
return 0
|
||||
|
||||
def read(self, path, size, offset):
|
||||
if path not in self.files:
|
||||
return -errno.ENOENT
|
||||
comment = self.files[path]
|
||||
content = f"ID: {comment['id']}\nText: {comment['text']}\nParent: {comment['parent']}\n"
|
||||
return content.encode('utf-8')[offset:offset+size]
|
||||
|
||||
def main():
|
||||
usage = "YouTubeCommentFS: A filesystem to browse YouTube comments"
|
||||
server = CommentFS(version="%prog " + fuse.__version__, usage=usage, dash_s_do='setsingle')
|
||||
server.parser.add_option(mountopt="uid", metavar="UID", default=os.getuid(),
|
||||
help="Set the owner of the mounted filesystem")
|
||||
server.parser.add_option(mountopt="gid", metavar="GID", default=os.getgid(),
|
||||
help="Set the group of the mounted filesystem")
|
||||
server.multithreaded = False
|
||||
server.allow_other = True
|
||||
server.parse(errex=1)
|
||||
server.main()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
#!/usr/bin/env python3
|
||||
import json
|
||||
from collections import defaultdict
|
||||
|
||||
def build_comment_tree(comments):
|
||||
tree = defaultdict(list)
|
||||
root_comments = []
|
||||
|
||||
print(f"Total comments: {len(comments)}") # Debug info
|
||||
|
||||
for comment in comments:
|
||||
if comment['parent'] == "root":
|
||||
root_comments.append(comment)
|
||||
else:
|
||||
tree[comment['parent']].append(comment)
|
||||
|
||||
print(f"Root comments: {len(root_comments)}") # Debug info
|
||||
|
||||
def build_subtree(comment):
|
||||
return {
|
||||
#"id": comment['id'],
|
||||
"text": comment['text'],
|
||||
"replies": [build_subtree(reply) for reply in tree[comment['id']]]
|
||||
}
|
||||
|
||||
return [build_subtree(comment) for comment in root_comments]
|
||||
|
||||
with open('comments.jsonl', 'r', encoding='utf-8') as f:
|
||||
comments = [json.loads(line) for line in f]
|
||||
|
||||
comment_tree = build_comment_tree(comments)
|
||||
|
||||
print(f"Final tree length: {len(comment_tree)}") # Debug info
|
||||
|
||||
with open('comment_tree.json', 'w') as f:
|
||||
json.dump(comment_tree, f, ensure_ascii=False, indent=2)
|
||||
|
Loading…
Reference in New Issue