1
0
Fork 0
mirror of synced 2024-05-19 19:52:41 +12:00
bulk-downloader-for-reddit/bdfr/file_name_formatter.py

228 lines
8.8 KiB
Python
Raw Normal View History

2021-02-11 12:08:47 +13:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2021-04-22 12:38:32 +12:00
import datetime
2021-03-11 16:20:39 +13:00
import logging
import platform
2021-02-11 12:08:47 +13:00
import re
2021-05-18 14:39:08 +12:00
import subprocess
2021-02-11 12:08:47 +13:00
from pathlib import Path
2022-12-01 15:48:10 +13:00
from typing import Optional, Union
2021-02-11 12:08:47 +13:00
from praw.models import Comment, Submission
2021-02-11 12:08:47 +13:00
2021-04-12 19:58:32 +12:00
from bdfr.exceptions import BulkDownloaderException
from bdfr.resource import Resource
2021-02-11 12:08:47 +13:00
2021-03-11 16:20:39 +13:00
logger = logging.getLogger(__name__)
2021-02-11 12:08:47 +13:00
class FileNameFormatter:
key_terms = (
2022-12-03 18:11:17 +13:00
"date",
"flair",
"postid",
"redditor",
"subreddit",
"title",
"upvotes",
)
WINDOWS_MAX_PATH_LENGTH = 260
LINUX_MAX_PATH_LENGTH = 4096
def __init__(
self,
file_format_string: str,
directory_format_string: str,
time_format_string: str,
restriction_scheme: Optional[str] = None,
):
if not self.validate_string(file_format_string):
raise BulkDownloaderException(f'"{file_format_string}" is not a valid format string')
2021-02-11 12:08:47 +13:00
self.file_format_string = file_format_string
2022-12-03 18:11:17 +13:00
self.directory_format_string: list[str] = directory_format_string.split("/")
2021-05-02 15:56:39 +12:00
self.time_format_string = time_format_string
self.restiction_scheme = restriction_scheme.lower().strip() if restriction_scheme else None
if self.restiction_scheme == "windows":
self.max_path = self.WINDOWS_MAX_PATH_LENGTH
else:
self.max_path = self.find_max_path_length()
2021-02-11 12:08:47 +13:00
2022-12-01 15:48:10 +13:00
def _format_name(self, submission: Union[Comment, Submission], format_string: str) -> str:
if isinstance(submission, Submission):
2021-05-02 15:56:39 +12:00
attributes = self._generate_name_dict_from_submission(submission)
elif isinstance(submission, Comment):
2021-05-02 15:56:39 +12:00
attributes = self._generate_name_dict_from_comment(submission)
else:
2022-12-03 18:11:17 +13:00
raise BulkDownloaderException(f"Cannot name object {type(submission).__name__}")
result = format_string
for key in attributes.keys():
2022-12-03 18:11:17 +13:00
if re.search(rf"(?i).*{{{key}}}.*", result):
key_value = str(attributes.get(key, "unknown"))
key_value = FileNameFormatter._convert_unicode_escapes(key_value)
2022-12-03 18:11:17 +13:00
key_value = key_value.replace("\\", "\\\\")
result = re.sub(rf"(?i){{{key}}}", key_value, result)
2022-12-03 18:11:17 +13:00
result = result.replace("/", "")
if self.restiction_scheme is None:
if platform.system() == "Windows":
result = FileNameFormatter._format_for_windows(result)
elif self.restiction_scheme == "windows":
logger.debug("Forcing Windows-compatible filenames")
result = FileNameFormatter._format_for_windows(result)
2021-02-11 12:08:47 +13:00
return result
@staticmethod
def _convert_unicode_escapes(in_string: str) -> str:
2022-12-03 18:11:17 +13:00
pattern = re.compile(r"(\\u\d{4})")
matches = re.search(pattern, in_string)
if matches:
for match in matches.groups():
2022-12-03 18:11:17 +13:00
converted_match = bytes(match, "utf-8").decode("unicode-escape")
in_string = in_string.replace(match, converted_match)
return in_string
2021-05-02 15:56:39 +12:00
def _generate_name_dict_from_submission(self, submission: Submission) -> dict:
submission_attributes = {
2022-12-03 18:11:17 +13:00
"title": submission.title,
"subreddit": submission.subreddit.display_name,
"redditor": submission.author.name if submission.author else "DELETED",
"postid": submission.id,
"upvotes": submission.score,
"flair": submission.link_flair_text,
"date": self._convert_timestamp(submission.created_utc),
}
return submission_attributes
2021-05-02 15:56:39 +12:00
def _convert_timestamp(self, timestamp: float) -> str:
2021-04-22 12:38:32 +12:00
input_time = datetime.datetime.fromtimestamp(timestamp)
2022-12-03 18:11:17 +13:00
if self.time_format_string.upper().strip() == "ISO":
2021-05-02 15:56:39 +12:00
return input_time.isoformat()
else:
return input_time.strftime(self.time_format_string)
2021-04-22 12:38:32 +12:00
2021-05-02 15:56:39 +12:00
def _generate_name_dict_from_comment(self, comment: Comment) -> dict:
comment_attributes = {
2022-12-03 18:11:17 +13:00
"title": comment.submission.title,
"subreddit": comment.subreddit.display_name,
"redditor": comment.author.name if comment.author else "DELETED",
"postid": comment.id,
"upvotes": comment.score,
"flair": "",
"date": self._convert_timestamp(comment.created_utc),
}
return comment_attributes
def format_path(
2022-12-03 18:11:17 +13:00
self,
resource: Resource,
destination_directory: Path,
index: Optional[int] = None,
) -> Path:
subfolder = Path(
destination_directory,
2021-05-18 14:39:08 +12:00
*[self._format_name(resource.source_submission, part) for part in self.directory_format_string],
)
2022-12-03 18:11:17 +13:00
index = f"_{index}" if index else ""
2021-03-13 14:13:36 +13:00
if not resource.extension:
2022-12-03 18:11:17 +13:00
raise BulkDownloaderException(f"Resource from {resource.url} has no extension")
2021-03-13 14:13:36 +13:00
file_name = str(self._format_name(resource.source_submission, self.file_format_string))
2022-12-03 18:11:17 +13:00
file_name = re.sub(r"\n", " ", file_name)
2022-12-03 18:11:17 +13:00
if not re.match(r".*\.$", file_name) and not re.match(r"^\..*", resource.extension):
ending = index + "." + resource.extension
else:
ending = index + resource.extension
2021-03-11 16:20:39 +13:00
try:
file_path = self.limit_file_name_length(file_name, ending, subfolder)
2021-03-11 16:20:39 +13:00
except TypeError:
2022-12-03 18:11:17 +13:00
raise BulkDownloaderException(f"Could not determine path name: {subfolder}, {index}, {resource.extension}")
2021-02-11 12:08:47 +13:00
return file_path
def limit_file_name_length(self, filename: str, ending: str, root: Path) -> Path:
2021-05-18 14:39:08 +12:00
root = root.resolve().expanduser()
2022-12-03 18:11:17 +13:00
possible_id = re.search(r"((?:_\w{6})?$)", filename)
2021-03-30 21:22:11 +13:00
if possible_id:
ending = possible_id.group(1) + ending
2022-12-03 18:11:17 +13:00
filename = filename[: possible_id.start()]
max_path = self.max_path
2021-11-21 16:14:28 +13:00
max_file_part_length_chars = 255 - len(ending)
2022-12-03 18:11:17 +13:00
max_file_part_length_bytes = 255 - len(ending.encode("utf-8"))
2021-05-18 14:39:08 +12:00
max_path_length = max_path - len(ending) - len(str(root)) - 1
2021-11-21 16:14:28 +13:00
out = Path(root, filename + ending)
2022-12-03 18:11:17 +13:00
while any(
[
len(filename) > max_file_part_length_chars,
len(filename.encode("utf-8")) > max_file_part_length_bytes,
len(str(out)) > max_path_length,
]
):
2021-03-13 15:39:54 +13:00
filename = filename[:-1]
2021-11-21 16:14:28 +13:00
out = Path(root, filename + ending)
return out
2021-05-18 14:39:08 +12:00
@staticmethod
def find_max_path_length() -> int:
try:
2022-12-03 18:11:17 +13:00
return int(subprocess.check_output(["getconf", "PATH_MAX", "/"]))
2021-05-18 14:39:08 +12:00
except (ValueError, subprocess.CalledProcessError, OSError):
2022-12-03 18:11:17 +13:00
if platform.system() == "Windows":
return FileNameFormatter.WINDOWS_MAX_PATH_LENGTH
2021-05-18 14:39:08 +12:00
else:
return FileNameFormatter.LINUX_MAX_PATH_LENGTH
2021-03-13 14:13:36 +13:00
def format_resource_paths(
2022-12-03 18:11:17 +13:00
self,
resources: list[Resource],
destination_directory: Path,
) -> list[tuple[Path, Resource]]:
out = []
if len(resources) == 1:
try:
out.append((self.format_path(resources[0], destination_directory, None), resources[0]))
except BulkDownloaderException as e:
2022-12-03 18:11:17 +13:00
logger.error(f"Could not generate file path for resource {resources[0].url}: {e}")
logger.exception("Could not generate file path")
else:
for i, res in enumerate(resources, start=1):
2022-12-03 18:11:17 +13:00
logger.log(9, f"Formatting filename with index {i}")
try:
out.append((self.format_path(res, destination_directory, i), res))
except BulkDownloaderException as e:
2022-12-03 18:11:17 +13:00
logger.error(f"Could not generate file path for resource {res.url}: {e}")
logger.exception("Could not generate file path")
return out
@staticmethod
def validate_string(test_string: str) -> bool:
if not test_string:
return False
2022-12-03 18:11:17 +13:00
result = any([f"{{{key}}}" in test_string.lower() for key in FileNameFormatter.key_terms])
if result:
2022-12-03 18:11:17 +13:00
if "POSTID" not in test_string:
logger.warning(
"Some files might not be downloaded due to name conflicts as filenames are"
" not guaranteed to be be unique without {POSTID}"
)
return True
else:
return False
@staticmethod
def _format_for_windows(input_string: str) -> str:
invalid_characters = r'<>:"\/|?*'
for char in invalid_characters:
2022-12-03 18:11:17 +13:00
input_string = input_string.replace(char, "")
input_string = FileNameFormatter._strip_emojis(input_string)
return input_string
@staticmethod
def _strip_emojis(input_string: str) -> str:
2022-12-03 18:11:17 +13:00
result = input_string.encode("ascii", errors="ignore").decode("utf-8")
return result