Archiver is smarter for comments (#242)
* Add comment name generation to file name formatter * Refactor to reduce duplication * Refactor archive entry classes * Refactor archiver class a bit * Refactor method * Fix comment retrieval * Add comment-downloading to archiver * Update test * Update test
This commit is contained in:
parent
75d74a5362
commit
32c9d6184c
|
@ -1,68 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding=utf-8
|
||||
|
||||
import logging
|
||||
|
||||
import praw.models
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ArchiveEntry:
|
||||
def __init__(self, submission: praw.models.Submission):
|
||||
self.submission = submission
|
||||
self.comments: list[dict] = []
|
||||
self.post_details: dict = {}
|
||||
|
||||
def compile(self) -> dict:
|
||||
self._fill_entry()
|
||||
out = self.post_details
|
||||
out['comments'] = self.comments
|
||||
return out
|
||||
|
||||
def _fill_entry(self):
|
||||
self._get_comments()
|
||||
self._get_post_details()
|
||||
|
||||
def _get_post_details(self):
|
||||
self.post_details = {
|
||||
'title': self.submission.title,
|
||||
'name': self.submission.name,
|
||||
'url': self.submission.url,
|
||||
'selftext': self.submission.selftext,
|
||||
'score': self.submission.score,
|
||||
'upvote_ratio': self.submission.upvote_ratio,
|
||||
'permalink': self.submission.permalink,
|
||||
'id': self.submission.id,
|
||||
'author': self.submission.author.name if self.submission.author else 'DELETED',
|
||||
'link_flair_text': self.submission.link_flair_text,
|
||||
'num_comments': self.submission.num_comments,
|
||||
'over_18': self.submission.over_18,
|
||||
'created_utc': self.submission.created_utc,
|
||||
}
|
||||
|
||||
def _get_comments(self):
|
||||
logger.debug(f'Retrieving full comment tree for submission {self.submission.id}')
|
||||
self.submission.comments.replace_more(0)
|
||||
for top_level_comment in self.submission.comments:
|
||||
self.comments.append(self._convert_comment_to_dict(top_level_comment))
|
||||
|
||||
@staticmethod
|
||||
def _convert_comment_to_dict(in_comment: praw.models.Comment) -> dict:
|
||||
out_dict = {
|
||||
'author': in_comment.author.name if in_comment.author else 'DELETED',
|
||||
'id': in_comment.id,
|
||||
'score': in_comment.score,
|
||||
'subreddit': in_comment.subreddit.display_name,
|
||||
'submission': in_comment.submission.id,
|
||||
'stickied': in_comment.stickied,
|
||||
'body': in_comment.body,
|
||||
'is_submitter': in_comment.is_submitter,
|
||||
'created_utc': in_comment.created_utc,
|
||||
'parent_id': in_comment.parent_id,
|
||||
'replies': [],
|
||||
}
|
||||
in_comment.replies.replace_more(0)
|
||||
for reply in in_comment.replies:
|
||||
out_dict['replies'].append(ArchiveEntry._convert_comment_to_dict(reply))
|
||||
return out_dict
|
2
bulkredditdownloader/archive_entry/__init__.py
Normal file
2
bulkredditdownloader/archive_entry/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding=utf-8
|
36
bulkredditdownloader/archive_entry/base_archive_entry.py
Normal file
36
bulkredditdownloader/archive_entry/base_archive_entry.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding=utf-8
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from praw.models import Comment, Submission
|
||||
|
||||
|
||||
class BaseArchiveEntry(ABC):
|
||||
def __init__(self, source: (Comment, Submission)):
|
||||
self.source = source
|
||||
self.post_details: dict = {}
|
||||
|
||||
@abstractmethod
|
||||
def compile(self) -> dict:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def _convert_comment_to_dict(in_comment: Comment) -> dict:
|
||||
out_dict = {
|
||||
'author': in_comment.author.name if in_comment.author else 'DELETED',
|
||||
'id': in_comment.id,
|
||||
'score': in_comment.score,
|
||||
'subreddit': in_comment.subreddit.display_name,
|
||||
'submission': in_comment.submission.id,
|
||||
'stickied': in_comment.stickied,
|
||||
'body': in_comment.body,
|
||||
'is_submitter': in_comment.is_submitter,
|
||||
'created_utc': in_comment.created_utc,
|
||||
'parent_id': in_comment.parent_id,
|
||||
'replies': [],
|
||||
}
|
||||
in_comment.replies.replace_more(0)
|
||||
for reply in in_comment.replies:
|
||||
out_dict['replies'].append(BaseArchiveEntry._convert_comment_to_dict(reply))
|
||||
return out_dict
|
21
bulkredditdownloader/archive_entry/comment_archive_entry.py
Normal file
21
bulkredditdownloader/archive_entry/comment_archive_entry.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding=utf-8
|
||||
|
||||
import logging
|
||||
|
||||
import praw.models
|
||||
|
||||
from bulkredditdownloader.archive_entry.base_archive_entry import BaseArchiveEntry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CommentArchiveEntry(BaseArchiveEntry):
|
||||
def __init__(self, comment: praw.models.Comment):
|
||||
super(CommentArchiveEntry, self).__init__(comment)
|
||||
|
||||
def compile(self) -> dict:
|
||||
self.source.refresh()
|
||||
self.post_details = self._convert_comment_to_dict(self.source)
|
||||
self.post_details['submission_title'] = self.source.submission.title
|
||||
return self.post_details
|
|
@ -0,0 +1,47 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding=utf-8
|
||||
|
||||
import logging
|
||||
|
||||
import praw.models
|
||||
|
||||
from bulkredditdownloader.archive_entry.base_archive_entry import BaseArchiveEntry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SubmissionArchiveEntry(BaseArchiveEntry):
|
||||
def __init__(self, submission: praw.models.Submission):
|
||||
super(SubmissionArchiveEntry, self).__init__(submission)
|
||||
|
||||
def compile(self) -> dict:
|
||||
comments = self._get_comments()
|
||||
self._get_post_details()
|
||||
out = self.post_details
|
||||
out['comments'] = comments
|
||||
return out
|
||||
|
||||
def _get_post_details(self):
|
||||
self.post_details = {
|
||||
'title': self.source.title,
|
||||
'name': self.source.name,
|
||||
'url': self.source.url,
|
||||
'selftext': self.source.selftext,
|
||||
'score': self.source.score,
|
||||
'upvote_ratio': self.source.upvote_ratio,
|
||||
'permalink': self.source.permalink,
|
||||
'id': self.source.id,
|
||||
'author': self.source.author.name if self.source.author else 'DELETED',
|
||||
'link_flair_text': self.source.link_flair_text,
|
||||
'num_comments': self.source.num_comments,
|
||||
'over_18': self.source.over_18,
|
||||
'created_utc': self.source.created_utc,
|
||||
}
|
||||
|
||||
def _get_comments(self) -> list[dict]:
|
||||
logger.debug(f'Retrieving full comment tree for submission {self.source.id}')
|
||||
comments = []
|
||||
self.source.comments.replace_more(0)
|
||||
for top_level_comment in self.source.comments:
|
||||
comments.append(self._convert_comment_to_dict(top_level_comment))
|
||||
return comments
|
|
@ -3,12 +3,15 @@
|
|||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
|
||||
import dict2xml
|
||||
import praw.models
|
||||
import yaml
|
||||
|
||||
from bulkredditdownloader.archive_entry import ArchiveEntry
|
||||
from bulkredditdownloader.archive_entry.base_archive_entry import BaseArchiveEntry
|
||||
from bulkredditdownloader.archive_entry.comment_archive_entry import CommentArchiveEntry
|
||||
from bulkredditdownloader.archive_entry.submission_archive_entry import SubmissionArchiveEntry
|
||||
from bulkredditdownloader.configuration import Configuration
|
||||
from bulkredditdownloader.downloader import RedditDownloader
|
||||
from bulkredditdownloader.exceptions import ArchiverError
|
||||
|
@ -25,41 +28,60 @@ class Archiver(RedditDownloader):
|
|||
for generator in self.reddit_lists:
|
||||
for submission in generator:
|
||||
logger.debug(f'Attempting to archive submission {submission.id}')
|
||||
self._write_submission(submission)
|
||||
self._write_entry(submission)
|
||||
|
||||
def _write_submission(self, submission: praw.models.Submission):
|
||||
archive_entry = ArchiveEntry(submission)
|
||||
def _get_submissions_from_link(self) -> list[list[praw.models.Submission]]:
|
||||
supplied_submissions = []
|
||||
for sub_id in self.args.link:
|
||||
if len(sub_id) == 6:
|
||||
supplied_submissions.append(self.reddit_instance.submission(id=sub_id))
|
||||
elif re.match(r'^\w{7}$', sub_id):
|
||||
supplied_submissions.append(self.reddit_instance.comment(id=sub_id))
|
||||
else:
|
||||
supplied_submissions.append(self.reddit_instance.submission(url=sub_id))
|
||||
return [supplied_submissions]
|
||||
|
||||
@staticmethod
|
||||
def _pull_lever_entry_factory(praw_item: (praw.models.Submission, praw.models.Comment)) -> BaseArchiveEntry:
|
||||
if isinstance(praw_item, praw.models.Submission):
|
||||
return SubmissionArchiveEntry(praw_item)
|
||||
elif isinstance(praw_item, praw.models.Comment):
|
||||
return CommentArchiveEntry(praw_item)
|
||||
else:
|
||||
raise ArchiverError(f'Factory failed to classify item of type {type(praw_item).__name__}')
|
||||
|
||||
def _write_entry(self, praw_item: (praw.models.Submission, praw.models.Comment)):
|
||||
archive_entry = self._pull_lever_entry_factory(praw_item)
|
||||
if self.args.format == 'json':
|
||||
self._write_submission_json(archive_entry)
|
||||
self._write_entry_json(archive_entry)
|
||||
elif self.args.format == 'xml':
|
||||
self._write_submission_xml(archive_entry)
|
||||
self._write_entry_xml(archive_entry)
|
||||
elif self.args.format == 'yaml':
|
||||
self._write_submission_yaml(archive_entry)
|
||||
self._write_entry_yaml(archive_entry)
|
||||
else:
|
||||
raise ArchiverError(f'Unknown format {self.args.format} given')
|
||||
logger.info(f'Record for submission {submission.id} written to disk')
|
||||
logger.info(f'Record for entry item {praw_item.id} written to disk')
|
||||
|
||||
def _write_submission_json(self, entry: ArchiveEntry):
|
||||
resource = Resource(entry.submission, '', '.json')
|
||||
def _write_entry_json(self, entry: BaseArchiveEntry):
|
||||
resource = Resource(entry.source, '', '.json')
|
||||
content = json.dumps(entry.compile())
|
||||
self._write_content_to_disk(resource, content)
|
||||
|
||||
def _write_entry_xml(self, entry: BaseArchiveEntry):
|
||||
resource = Resource(entry.source, '', '.xml')
|
||||
content = dict2xml.dict2xml(entry.compile(), wrap='root')
|
||||
self._write_content_to_disk(resource, content)
|
||||
|
||||
def _write_entry_yaml(self, entry: BaseArchiveEntry):
|
||||
resource = Resource(entry.source, '', '.yaml')
|
||||
content = yaml.dump(entry.compile())
|
||||
self._write_content_to_disk(resource, content)
|
||||
|
||||
def _write_content_to_disk(self, resource: Resource, content: str):
|
||||
file_path = self.file_name_formatter.format_path(resource, self.download_directory)
|
||||
file_path.parent.mkdir(exist_ok=True, parents=True)
|
||||
with open(file_path, 'w') as file:
|
||||
logger.debug(f'Writing submission {entry.submission.id} to file in JSON format at {file_path}')
|
||||
json.dump(entry.compile(), file)
|
||||
|
||||
def _write_submission_xml(self, entry: ArchiveEntry):
|
||||
resource = Resource(entry.submission, '', '.xml')
|
||||
file_path = self.file_name_formatter.format_path(resource, self.download_directory)
|
||||
file_path.parent.mkdir(exist_ok=True, parents=True)
|
||||
with open(file_path, 'w') as file:
|
||||
logger.debug(f'Writing submission {entry.submission.id} to file in XML format at {file_path}')
|
||||
xml_entry = dict2xml.dict2xml(entry.compile(), wrap='root')
|
||||
file.write(xml_entry)
|
||||
|
||||
def _write_submission_yaml(self, entry: ArchiveEntry):
|
||||
resource = Resource(entry.submission, '', '.yaml')
|
||||
file_path = self.file_name_formatter.format_path(resource, self.download_directory)
|
||||
file_path.parent.mkdir(exist_ok=True, parents=True)
|
||||
with open(file_path, 'w') as file:
|
||||
logger.debug(f'Writing submission {entry.submission.id} to file in YAML format at {file_path}')
|
||||
yaml.dump(entry.compile(), file)
|
||||
logger.debug(
|
||||
f'Writing entry {resource.source_submission.id} to file in {resource.extension[1:].upper()}'
|
||||
f' format at {file_path}')
|
||||
file.write(content)
|
||||
|
|
|
@ -7,7 +7,7 @@ import re
|
|||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import praw.models
|
||||
from praw.models import Comment, Submission
|
||||
|
||||
from bulkredditdownloader.exceptions import BulkDownloaderException
|
||||
from bulkredditdownloader.resource import Resource
|
||||
|
@ -25,20 +25,17 @@ class FileNameFormatter:
|
|||
self.directory_format_string = directory_format_string
|
||||
|
||||
@staticmethod
|
||||
def _format_name(submission: praw.models.Submission, format_string: str) -> str:
|
||||
submission_attributes = {
|
||||
'title': submission.title,
|
||||
'subreddit': submission.subreddit.display_name,
|
||||
'redditor': submission.author.name if submission.author else 'DELETED',
|
||||
'postid': submission.id,
|
||||
'upvotes': submission.score,
|
||||
'flair': submission.link_flair_text,
|
||||
'date': submission.created_utc
|
||||
}
|
||||
def _format_name(submission: (Comment, Submission), format_string: str) -> str:
|
||||
if isinstance(submission, Submission):
|
||||
attributes = FileNameFormatter._generate_name_dict_from_submission(submission)
|
||||
elif isinstance(submission, Comment):
|
||||
attributes = FileNameFormatter._generate_name_dict_from_comment(submission)
|
||||
else:
|
||||
raise BulkDownloaderException(f'Cannot name object {type(submission).__name__}')
|
||||
result = format_string
|
||||
for key in submission_attributes.keys():
|
||||
for key in attributes.keys():
|
||||
if re.search(r'(?i).*{{{}}}.*'.format(key), result):
|
||||
result = re.sub(r'(?i){{{}}}'.format(key), str(submission_attributes.get(key, 'unknown')), result)
|
||||
result = re.sub(r'(?i){{{}}}'.format(key), str(attributes.get(key, 'unknown')), result)
|
||||
logger.log(9, f'Found key string {key} in name')
|
||||
|
||||
result = result.replace('/', '')
|
||||
|
@ -48,7 +45,37 @@ class FileNameFormatter:
|
|||
|
||||
return result
|
||||
|
||||
def format_path(self, resource: Resource, destination_directory: Path, index: Optional[int] = None) -> Path:
|
||||
@staticmethod
|
||||
def _generate_name_dict_from_submission(submission: Submission) -> dict:
|
||||
submission_attributes = {
|
||||
'title': submission.title,
|
||||
'subreddit': submission.subreddit.display_name,
|
||||
'redditor': submission.author.name if submission.author else 'DELETED',
|
||||
'postid': submission.id,
|
||||
'upvotes': submission.score,
|
||||
'flair': submission.link_flair_text,
|
||||
'date': submission.created_utc
|
||||
}
|
||||
return submission_attributes
|
||||
|
||||
@staticmethod
|
||||
def _generate_name_dict_from_comment(comment: Comment) -> dict:
|
||||
comment_attributes = {
|
||||
'title': comment.submission.title,
|
||||
'subreddit': comment.subreddit.display_name,
|
||||
'redditor': comment.author.name if comment.author else 'DELETED',
|
||||
'postid': comment.id,
|
||||
'upvotes': comment.score,
|
||||
'flair': '',
|
||||
'date': comment.created_utc
|
||||
}
|
||||
return comment_attributes
|
||||
|
||||
def format_path(
|
||||
self,
|
||||
resource: Resource,
|
||||
destination_directory: Path,
|
||||
index: Optional[int] = None) -> Path:
|
||||
subfolder = destination_directory / self._format_name(resource.source_submission, self.directory_format_string)
|
||||
index = f'_{str(index)}' if index else ''
|
||||
if not resource.extension:
|
||||
|
|
2
bulkredditdownloader/tests/archive_entry/__init__.py
Normal file
2
bulkredditdownloader/tests/archive_entry/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding=utf-8
|
|
@ -0,0 +1,38 @@
|
|||
#!/usr/bin/env python3
|
||||
# coding=utf-8
|
||||
|
||||
import praw
|
||||
import pytest
|
||||
|
||||
from bulkredditdownloader.archive_entry.comment_archive_entry import CommentArchiveEntry
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.reddit
|
||||
@pytest.mark.parametrize(('test_comment_id', 'expected_dict'), (
|
||||
('gstd4hk', {
|
||||
'author': 'james_pic',
|
||||
'subreddit': 'Python',
|
||||
'submission': 'mgi4op',
|
||||
'submission_title': '76% Faster CPython',
|
||||
}),
|
||||
))
|
||||
def test_get_comment_details(test_comment_id: str, expected_dict: dict, reddit_instance: praw.Reddit):
|
||||
comment = reddit_instance.comment(id=test_comment_id)
|
||||
test_entry = CommentArchiveEntry(comment)
|
||||
result = test_entry.compile()
|
||||
assert all([result.get(key) == expected_dict[key] for key in expected_dict.keys()])
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.reddit
|
||||
@pytest.mark.parametrize(('test_comment_id', 'expected_min_comments'), (
|
||||
('gstd4hk', 4),
|
||||
('gsvyste', 3),
|
||||
('gsxnvvb', 5),
|
||||
))
|
||||
def test_get_comment_replies(test_comment_id: str, expected_min_comments: int, reddit_instance: praw.Reddit):
|
||||
comment = reddit_instance.comment(id=test_comment_id)
|
||||
test_entry = CommentArchiveEntry(comment)
|
||||
result = test_entry.compile()
|
||||
assert len(result.get('replies')) >= expected_min_comments
|
|
@ -4,7 +4,7 @@
|
|||
import praw
|
||||
import pytest
|
||||
|
||||
from bulkredditdownloader.archive_entry import ArchiveEntry
|
||||
from bulkredditdownloader.archive_entry.submission_archive_entry import SubmissionArchiveEntry
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
|
@ -14,9 +14,9 @@ from bulkredditdownloader.archive_entry import ArchiveEntry
|
|||
))
|
||||
def test_get_comments(test_submission_id: str, min_comments: int, reddit_instance: praw.Reddit):
|
||||
test_submission = reddit_instance.submission(id=test_submission_id)
|
||||
test_archive_entry = ArchiveEntry(test_submission)
|
||||
test_archive_entry._get_comments()
|
||||
assert len(test_archive_entry.comments) >= min_comments
|
||||
test_archive_entry = SubmissionArchiveEntry(test_submission)
|
||||
results = test_archive_entry._get_comments()
|
||||
assert len(results) >= min_comments
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
|
@ -27,6 +27,6 @@ def test_get_comments(test_submission_id: str, min_comments: int, reddit_instanc
|
|||
))
|
||||
def test_get_post_details(test_submission_id: str, expected_dict: dict, reddit_instance: praw.Reddit):
|
||||
test_submission = reddit_instance.submission(id=test_submission_id)
|
||||
test_archive_entry = ArchiveEntry(test_submission)
|
||||
test_archive_entry = SubmissionArchiveEntry(test_submission)
|
||||
test_archive_entry._get_post_details()
|
||||
assert all([test_archive_entry.post_details[key] == expected_dict[key] for key in expected_dict.keys()])
|
||||
assert all([test_archive_entry.post_details.get(key) == expected_dict[key] for key in expected_dict.keys()])
|
|
@ -10,7 +10,7 @@ from bulkredditdownloader.site_downloaders.erome import Erome
|
|||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.parametrize(('test_url', 'expected_urls'), (
|
||||
('https://www.erome.com/a/vqtPuLXh', ('https://s6.erome.com/365/vqtPuLXh/KH2qBT99_480p.mp4',)),
|
||||
('https://www.erome.com/a/vqtPuLXh', ('https://s11.erome.com/365/vqtPuLXh/KH2qBT99_480p.mp4',)),
|
||||
('https://www.erome.com/a/ORhX0FZz',
|
||||
('https://s4.erome.com/355/ORhX0FZz/9IYQocM9_480p.mp4',
|
||||
'https://s4.erome.com/355/ORhX0FZz/9eEDc8xm_480p.mp4',
|
||||
|
|
|
@ -7,7 +7,7 @@ from unittest.mock import MagicMock
|
|||
import praw
|
||||
import pytest
|
||||
|
||||
from bulkredditdownloader.archive_entry import ArchiveEntry
|
||||
from bulkredditdownloader.archive_entry.submission_archive_entry import SubmissionArchiveEntry
|
||||
from bulkredditdownloader.archiver import Archiver
|
||||
|
||||
|
||||
|
@ -21,9 +21,9 @@ def test_write_submission_json(test_submission_id: str, tmp_path: Path, reddit_i
|
|||
test_path = Path(tmp_path, 'test.json')
|
||||
test_submission = reddit_instance.submission(id=test_submission_id)
|
||||
archiver_mock.file_name_formatter.format_path.return_value = test_path
|
||||
test_entry = ArchiveEntry(test_submission)
|
||||
Archiver._write_submission_json(archiver_mock, test_entry)
|
||||
assert test_path.exists()
|
||||
test_entry = SubmissionArchiveEntry(test_submission)
|
||||
Archiver._write_entry_json(archiver_mock, test_entry)
|
||||
archiver_mock._write_content_to_disk.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
|
@ -36,9 +36,9 @@ def test_write_submission_xml(test_submission_id: str, tmp_path: Path, reddit_in
|
|||
test_path = Path(tmp_path, 'test.xml')
|
||||
test_submission = reddit_instance.submission(id=test_submission_id)
|
||||
archiver_mock.file_name_formatter.format_path.return_value = test_path
|
||||
test_entry = ArchiveEntry(test_submission)
|
||||
Archiver._write_submission_xml(archiver_mock, test_entry)
|
||||
assert test_path.exists()
|
||||
test_entry = SubmissionArchiveEntry(test_submission)
|
||||
Archiver._write_entry_xml(archiver_mock, test_entry)
|
||||
archiver_mock._write_content_to_disk.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
|
@ -48,9 +48,10 @@ def test_write_submission_xml(test_submission_id: str, tmp_path: Path, reddit_in
|
|||
))
|
||||
def test_write_submission_yaml(test_submission_id: str, tmp_path: Path, reddit_instance: praw.Reddit):
|
||||
archiver_mock = MagicMock()
|
||||
archiver_mock.download_directory = tmp_path
|
||||
test_path = Path(tmp_path, 'test.yaml')
|
||||
test_submission = reddit_instance.submission(id=test_submission_id)
|
||||
archiver_mock.file_name_formatter.format_path.return_value = test_path
|
||||
test_entry = ArchiveEntry(test_submission)
|
||||
Archiver._write_submission_yaml(archiver_mock, test_entry)
|
||||
assert test_path.exists()
|
||||
test_entry = SubmissionArchiveEntry(test_submission)
|
||||
Archiver._write_entry_yaml(archiver_mock, test_entry)
|
||||
archiver_mock._write_content_to_disk.assert_called_once()
|
||||
|
|
|
@ -22,11 +22,12 @@ def submission() -> MagicMock:
|
|||
test.score = 1000
|
||||
test.link_flair_text = 'test_flair'
|
||||
test.created_utc = 123456789
|
||||
test.__class__ = praw.models.Submission
|
||||
return test
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def reddit_submission(reddit_instance) -> praw.models.Submission:
|
||||
def reddit_submission(reddit_instance: praw.Reddit) -> praw.models.Submission:
|
||||
return reddit_instance.submission(id='lgilgt')
|
||||
|
||||
|
||||
|
@ -137,6 +138,7 @@ def test_format_multiple_resources():
|
|||
new_mock.url = 'https://example.com/test.png'
|
||||
new_mock.extension = '.png'
|
||||
new_mock.source_submission.title = 'test'
|
||||
new_mock.source_submission.__class__ = praw.models.Submission
|
||||
mocks.append(new_mock)
|
||||
test_formatter = FileNameFormatter('{TITLE}', '')
|
||||
results = test_formatter.format_resource_paths(mocks, Path('.'))
|
||||
|
@ -176,13 +178,12 @@ def test_preserve_id_append_when_shortening(test_filename: str, test_ending: str
|
|||
assert result.endswith(expected_end)
|
||||
|
||||
|
||||
def test_shorten_filenames(tmp_path: Path):
|
||||
test_submission = MagicMock()
|
||||
test_submission.title = 'A' * 300
|
||||
test_submission.author.name = 'test'
|
||||
test_submission.subreddit.display_name = 'test'
|
||||
test_submission.id = 'BBBBBB'
|
||||
test_resource = Resource(test_submission, 'www.example.com/empty', '.jpeg')
|
||||
def test_shorten_filenames(submission: MagicMock, tmp_path: Path):
|
||||
submission.title = 'A' * 300
|
||||
submission.author.name = 'test'
|
||||
submission.subreddit.display_name = 'test'
|
||||
submission.id = 'BBBBBB'
|
||||
test_resource = Resource(submission, 'www.example.com/empty', '.jpeg')
|
||||
test_formatter = FileNameFormatter('{REDDITOR}_{TITLE}_{POSTID}', '{SUBREDDIT}')
|
||||
result = test_formatter.format_path(test_resource, tmp_path)
|
||||
result.parent.mkdir(parents=True)
|
||||
|
@ -212,3 +213,50 @@ def test_format_file_name_for_windows(test_string: str, expected: str):
|
|||
def test_strip_emojies(test_string: str, expected: str):
|
||||
result = FileNameFormatter._strip_emojis(test_string)
|
||||
assert result == expected
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.reddit
|
||||
@pytest.mark.parametrize(('test_submission_id', 'expected'), (
|
||||
('mfuteh', {'title': 'Why Do Interviewers Ask Linked List Questions?', 'redditor': 'mjgardner'}),
|
||||
))
|
||||
def test_generate_dict_for_submission(test_submission_id: str, expected: dict, reddit_instance: praw.Reddit):
|
||||
test_submission = reddit_instance.submission(id=test_submission_id)
|
||||
result = FileNameFormatter._generate_name_dict_from_submission(test_submission)
|
||||
assert all([result.get(key) == expected[key] for key in expected.keys()])
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.reddit
|
||||
@pytest.mark.parametrize(('test_comment_id', 'expected'), (
|
||||
('gsq0yuw', {
|
||||
'title': 'Why Do Interviewers Ask Linked List Questions?',
|
||||
'redditor': 'Doctor-Dapper',
|
||||
'postid': 'gsq0yuw',
|
||||
'flair': '',
|
||||
}),
|
||||
))
|
||||
def test_generate_dict_for_comment(test_comment_id: str, expected: dict, reddit_instance: praw.Reddit):
|
||||
test_comment = reddit_instance.comment(id=test_comment_id)
|
||||
result = FileNameFormatter._generate_name_dict_from_comment(test_comment)
|
||||
assert all([result.get(key) == expected[key] for key in expected.keys()])
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.reddit
|
||||
@pytest.mark.parametrize(('test_file_scheme', 'test_folder_scheme', 'test_comment_id', 'expected_name'), (
|
||||
('{POSTID}', '', 'gsoubde', 'gsoubde.json'),
|
||||
('{REDDITOR}_{POSTID}', '', 'gsoubde', 'DELETED_gsoubde.json'),
|
||||
))
|
||||
def test_format_archive_entry_comment(
|
||||
test_file_scheme: str,
|
||||
test_folder_scheme: str,
|
||||
test_comment_id: str,
|
||||
expected_name: str,
|
||||
tmp_path: Path,
|
||||
reddit_instance: praw.Reddit):
|
||||
test_comment = reddit_instance.comment(id=test_comment_id)
|
||||
test_formatter = FileNameFormatter(test_file_scheme, test_folder_scheme)
|
||||
test_entry = Resource(test_comment, '', '.json')
|
||||
result = test_formatter.format_path(test_entry, tmp_path)
|
||||
assert result.name == expected_name
|
||||
|
|
|
@ -168,6 +168,21 @@ def test_cli_download_long(test_args: list[str], tmp_path: Path):
|
|||
assert result.exit_code == 0
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.reddit
|
||||
@pytest.mark.skipif(Path('test_config.cfg') is False, reason='A test config file is required for integration tests')
|
||||
@pytest.mark.parametrize('test_args', (
|
||||
['-l', 'gstd4hk'],
|
||||
['-l', 'm2601g'],
|
||||
))
|
||||
def test_cli_archive_single(test_args: list[str], tmp_path: Path):
|
||||
runner = CliRunner()
|
||||
test_args = ['archive', str(tmp_path), '-v', '--config', 'test_config.cfg'] + test_args
|
||||
result = runner.invoke(cli, test_args)
|
||||
assert result.exit_code == 0
|
||||
assert re.search(r'Writing entry .*? to file in .*? format', result.output)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.reddit
|
||||
@pytest.mark.skipif(Path('test_config.cfg') is False, reason='A test config file is required for integration tests')
|
||||
|
@ -184,7 +199,7 @@ def test_cli_archive_subreddit(test_args: list[str], tmp_path: Path):
|
|||
test_args = ['archive', str(tmp_path), '-v', '--config', 'test_config.cfg'] + test_args
|
||||
result = runner.invoke(cli, test_args)
|
||||
assert result.exit_code == 0
|
||||
assert re.search(r'Writing submission .*? to file in .*? format', result.output)
|
||||
assert re.search(r'Writing entry .*? to file in .*? format', result.output)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
|
@ -200,7 +215,7 @@ def test_cli_archive_long(test_args: list[str], tmp_path: Path):
|
|||
test_args = ['archive', str(tmp_path), '-v', '--config', 'test_config.cfg'] + test_args
|
||||
result = runner.invoke(cli, test_args)
|
||||
assert result.exit_code == 0
|
||||
assert re.search(r'Writing submission .*? to file in .*? format', result.output)
|
||||
assert re.search(r'Writing entry .*? to file in .*? format', result.output)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
|
|
Loading…
Reference in a new issue