226 lines
8.7 KiB
Python
226 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import datetime
|
|
import logging
|
|
import platform
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional, Union
|
|
|
|
from praw.models import Comment, Submission
|
|
|
|
from bdfr.exceptions import BulkDownloaderException
|
|
from bdfr.resource import Resource
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileNameFormatter:
|
|
key_terms = (
|
|
"date",
|
|
"flair",
|
|
"postid",
|
|
"redditor",
|
|
"subreddit",
|
|
"title",
|
|
"upvotes",
|
|
)
|
|
WINDOWS_MAX_PATH_LENGTH = 260
|
|
LINUX_MAX_PATH_LENGTH = 4096
|
|
|
|
def __init__(
|
|
self,
|
|
file_format_string: str,
|
|
directory_format_string: str,
|
|
time_format_string: str,
|
|
restriction_scheme: Optional[str] = None,
|
|
):
|
|
if not self.validate_string(file_format_string):
|
|
raise BulkDownloaderException(f'"{file_format_string}" is not a valid format string')
|
|
self.file_format_string = file_format_string
|
|
self.directory_format_string: list[str] = directory_format_string.split("/")
|
|
self.time_format_string = time_format_string
|
|
self.restiction_scheme = restriction_scheme.lower().strip() if restriction_scheme else None
|
|
if self.restiction_scheme == "windows":
|
|
self.max_path = self.WINDOWS_MAX_PATH_LENGTH
|
|
else:
|
|
self.max_path = self.find_max_path_length()
|
|
|
|
def _format_name(self, submission: Union[Comment, Submission], format_string: str) -> str:
|
|
if isinstance(submission, Submission):
|
|
attributes = self._generate_name_dict_from_submission(submission)
|
|
elif isinstance(submission, Comment):
|
|
attributes = self._generate_name_dict_from_comment(submission)
|
|
else:
|
|
raise BulkDownloaderException(f"Cannot name object {type(submission).__name__}")
|
|
result = format_string
|
|
for key in attributes.keys():
|
|
if re.search(rf"(?i).*{{{key}}}.*", result):
|
|
key_value = str(attributes.get(key, "unknown"))
|
|
key_value = FileNameFormatter._convert_unicode_escapes(key_value)
|
|
key_value = key_value.replace("\\", "\\\\")
|
|
result = re.sub(rf"(?i){{{key}}}", key_value, result)
|
|
|
|
result = result.replace("/", "")
|
|
|
|
if self.restiction_scheme is None:
|
|
if platform.system() == "Windows":
|
|
result = FileNameFormatter._format_for_windows(result)
|
|
elif self.restiction_scheme == "windows":
|
|
logger.debug("Forcing Windows-compatible filenames")
|
|
result = FileNameFormatter._format_for_windows(result)
|
|
return result
|
|
|
|
@staticmethod
|
|
def _convert_unicode_escapes(in_string: str) -> str:
|
|
pattern = re.compile(r"(\\u\d{4})")
|
|
matches = re.search(pattern, in_string)
|
|
if matches:
|
|
for match in matches.groups():
|
|
converted_match = bytes(match, "utf-8").decode("unicode-escape")
|
|
in_string = in_string.replace(match, converted_match)
|
|
return in_string
|
|
|
|
def _generate_name_dict_from_submission(self, submission: Submission) -> dict:
|
|
submission_attributes = {
|
|
"title": submission.title,
|
|
"subreddit": submission.subreddit.display_name,
|
|
"redditor": submission.author.name if submission.author else "DELETED",
|
|
"postid": submission.id,
|
|
"upvotes": submission.score,
|
|
"flair": submission.link_flair_text,
|
|
"date": self._convert_timestamp(submission.created_utc),
|
|
}
|
|
return submission_attributes
|
|
|
|
def _convert_timestamp(self, timestamp: float) -> str:
|
|
input_time = datetime.datetime.fromtimestamp(timestamp)
|
|
if self.time_format_string.upper().strip() == "ISO":
|
|
return input_time.isoformat()
|
|
else:
|
|
return input_time.strftime(self.time_format_string)
|
|
|
|
def _generate_name_dict_from_comment(self, comment: Comment) -> dict:
|
|
comment_attributes = {
|
|
"title": comment.submission.title,
|
|
"subreddit": comment.subreddit.display_name,
|
|
"redditor": comment.author.name if comment.author else "DELETED",
|
|
"postid": comment.id,
|
|
"upvotes": comment.score,
|
|
"flair": "",
|
|
"date": self._convert_timestamp(comment.created_utc),
|
|
}
|
|
return comment_attributes
|
|
|
|
def format_path(
|
|
self,
|
|
resource: Resource,
|
|
destination_directory: Path,
|
|
index: Optional[int] = None,
|
|
) -> Path:
|
|
subfolder = Path(
|
|
destination_directory,
|
|
*[self._format_name(resource.source_submission, part) for part in self.directory_format_string],
|
|
)
|
|
index = f"_{index}" if index else ""
|
|
if not resource.extension:
|
|
raise BulkDownloaderException(f"Resource from {resource.url} has no extension")
|
|
file_name = str(self._format_name(resource.source_submission, self.file_format_string))
|
|
|
|
file_name = re.sub(r"\n", " ", file_name)
|
|
|
|
if not re.match(r".*\.$", file_name) and not re.match(r"^\..*", resource.extension):
|
|
ending = index + "." + resource.extension
|
|
else:
|
|
ending = index + resource.extension
|
|
|
|
try:
|
|
file_path = self.limit_file_name_length(file_name, ending, subfolder)
|
|
except TypeError:
|
|
raise BulkDownloaderException(f"Could not determine path name: {subfolder}, {index}, {resource.extension}")
|
|
return file_path
|
|
|
|
def limit_file_name_length(self, filename: str, ending: str, root: Path) -> Path:
|
|
root = root.resolve().expanduser()
|
|
possible_id = re.search(r"((?:_\w{6})?$)", filename)
|
|
if possible_id:
|
|
ending = possible_id.group(1) + ending
|
|
filename = filename[: possible_id.start()]
|
|
max_path = self.max_path
|
|
max_file_part_length_chars = 255 - len(ending)
|
|
max_file_part_length_bytes = 255 - len(ending.encode("utf-8"))
|
|
max_path_length = max_path - len(ending) - len(str(root)) - 1
|
|
|
|
out = Path(root, filename + ending)
|
|
while any(
|
|
[
|
|
len(filename) > max_file_part_length_chars,
|
|
len(filename.encode("utf-8")) > max_file_part_length_bytes,
|
|
len(str(out)) > max_path_length,
|
|
]
|
|
):
|
|
filename = filename[:-1]
|
|
out = Path(root, filename + ending)
|
|
|
|
return out
|
|
|
|
@staticmethod
|
|
def find_max_path_length() -> int:
|
|
try:
|
|
return int(subprocess.check_output(["getconf", "PATH_MAX", "/"]))
|
|
except (ValueError, subprocess.CalledProcessError, OSError):
|
|
if platform.system() == "Windows":
|
|
return FileNameFormatter.WINDOWS_MAX_PATH_LENGTH
|
|
else:
|
|
return FileNameFormatter.LINUX_MAX_PATH_LENGTH
|
|
|
|
def format_resource_paths(
|
|
self,
|
|
resources: list[Resource],
|
|
destination_directory: Path,
|
|
) -> list[tuple[Path, Resource]]:
|
|
out = []
|
|
if len(resources) == 1:
|
|
try:
|
|
out.append((self.format_path(resources[0], destination_directory, None), resources[0]))
|
|
except BulkDownloaderException as e:
|
|
logger.error(f"Could not generate file path for resource {resources[0].url}: {e}")
|
|
logger.exception("Could not generate file path")
|
|
else:
|
|
for i, res in enumerate(resources, start=1):
|
|
logger.log(9, f"Formatting filename with index {i}")
|
|
try:
|
|
out.append((self.format_path(res, destination_directory, i), res))
|
|
except BulkDownloaderException as e:
|
|
logger.error(f"Could not generate file path for resource {res.url}: {e}")
|
|
logger.exception("Could not generate file path")
|
|
return out
|
|
|
|
@staticmethod
|
|
def validate_string(test_string: str) -> bool:
|
|
if not test_string:
|
|
return False
|
|
result = any([f"{{{key}}}" in test_string.lower() for key in FileNameFormatter.key_terms])
|
|
if result:
|
|
if "POSTID" not in test_string:
|
|
logger.warning(
|
|
"Some files might not be downloaded due to name conflicts as filenames are"
|
|
" not guaranteed to be be unique without {POSTID}"
|
|
)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
@staticmethod
|
|
def _format_for_windows(input_string: str) -> str:
|
|
invalid_characters = r'<>:"\/|?*'
|
|
for char in invalid_characters:
|
|
input_string = input_string.replace(char, "")
|
|
input_string = FileNameFormatter._strip_emojis(input_string)
|
|
return input_string
|
|
|
|
@staticmethod
|
|
def _strip_emojis(input_string: str) -> str:
|
|
result = input_string.encode("ascii", errors="ignore").decode("utf-8")
|
|
return result
|