1
0
Fork 0
mirror of synced 2024-05-06 21:32:44 +12:00

pep585 and pathlib updates

This commit is contained in:
OMEGARAZER 2023-01-25 22:23:59 -05:00
parent e96b167b71
commit cf5f7bfd16
No known key found for this signature in database
GPG key ID: D89925310D306E35
14 changed files with 31 additions and 28 deletions

View file

@ -4,8 +4,10 @@
import json
import logging
import re
from collections.abc import Iterable, Iterator
from pathlib import Path
from time import sleep
from typing import Iterable, Iterator, Union
from typing import Union
import dict2xml
import praw.models
@ -108,13 +110,13 @@ class Archiver(RedditConnector):
def _write_entry_yaml(self, entry: BaseArchiveEntry):
resource = Resource(entry.source, "", lambda: None, ".yaml")
content = yaml.dump(entry.compile())
content = yaml.safe_dump(entry.compile())
self._write_content_to_disk(resource, content)
def _write_content_to_disk(self, resource: Resource, content: str):
file_path = self.file_name_formatter.format_path(resource, self.download_directory)
file_path.parent.mkdir(exist_ok=True, parents=True)
with open(file_path, "w", encoding="utf-8") as file:
with Path(file_path).open(mode="w", encoding="utf-8") as file:
logger.debug(
f"Writing entry {resource.source_submission.id} to file in {resource.extension[1:].upper()}"
f" format at {file_path}"

View file

@ -2,8 +2,8 @@
# -*- coding: utf-8 -*-
import logging
from collections.abc import Iterable
from time import sleep
from typing import Iterable
import prawcore

View file

@ -23,7 +23,7 @@ class Completion:
Path(comp_dir).mkdir(parents=True, exist_ok=True)
for point in self.entry_points:
self.env[f"_{point.upper().replace('-', '_')}_COMPLETE"] = "bash_source"
with open(comp_dir + point, "w") as file:
with Path(comp_dir + point).open(mode="w") as file:
file.write(subprocess.run([point], env=self.env, capture_output=True, text=True).stdout)
print(f"Bash completion for {point} written to {comp_dir}{point}")
if self.shell in ("all", "fish"):
@ -33,7 +33,7 @@ class Completion:
Path(comp_dir).mkdir(parents=True, exist_ok=True)
for point in self.entry_points:
self.env[f"_{point.upper().replace('-', '_')}_COMPLETE"] = "fish_source"
with open(comp_dir + point + ".fish", "w") as file:
with Path(comp_dir + point + ".fish").open(mode="w") as file:
file.write(subprocess.run([point], env=self.env, capture_output=True, text=True).stdout)
print(f"Fish completion for {point} written to {comp_dir}{point}.fish")
if self.shell in ("all", "zsh"):
@ -43,7 +43,7 @@ class Completion:
Path(comp_dir).mkdir(parents=True, exist_ok=True)
for point in self.entry_points:
self.env[f"_{point.upper().replace('-', '_')}_COMPLETE"] = "zsh_source"
with open(comp_dir + "_" + point, "w") as file:
with Path(comp_dir + "_" + point).open(mode="w") as file:
file.write(subprocess.run([point], env=self.env, capture_output=True, text=True).stdout)
print(f"Zsh completion for {point} written to {comp_dir}_{point}")

View file

@ -79,7 +79,7 @@ class Configuration(Namespace):
return
with yaml_file_loc.open() as file:
try:
opts = yaml.load(file, Loader=yaml.FullLoader)
opts = yaml.safe_load(file)
except yaml.YAMLError as e:
logger.error(f"Could not parse YAML options file: {e}")
return

View file

@ -10,11 +10,11 @@ import re
import shutil
import socket
from abc import ABCMeta, abstractmethod
from collections.abc import Callable, Iterable, Iterator
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from time import sleep
from typing import Callable, Iterable, Iterator
import appdirs
import praw
@ -119,7 +119,7 @@ class RedditConnector(metaclass=ABCMeta):
)
logger.debug(f"Setting filename restriction scheme to '{self.args.filename_restriction_scheme}'")
# Update config on disk
with open(self.config_location, "w") as file:
with Path(self.config_location).open(mode="w") as file:
self.cfg_parser.write(file)
def parse_disabled_modules(self):
@ -143,7 +143,7 @@ class RedditConnector(metaclass=ABCMeta):
)
token = oauth2_authenticator.retrieve_new_token()
self.cfg_parser["DEFAULT"]["user_token"] = token
with open(self.config_location, "w") as file:
with Path(self.config_location).open(mode="w") as file:
self.cfg_parser.write(file, True)
token_manager = OAuth2TokenManager(self.cfg_parser, self.config_location)

View file

@ -5,11 +5,11 @@ import hashlib
import logging.handlers
import os
import time
from collections.abc import Iterable
from datetime import datetime
from multiprocessing import Pool
from pathlib import Path
from time import sleep
from typing import Iterable
import praw
import praw.exceptions

View file

@ -103,6 +103,6 @@ class OAuth2TokenManager(praw.reddit.BaseTokenManager):
def post_refresh_callback(self, authorizer: praw.reddit.Authorizer):
self.config.set("DEFAULT", "user_token", authorizer.refresh_token)
with open(self.config_location, "w") as file:
with Path(self.config_location).open(mode="w") as file:
self.config.write(file, True)
logger.log(9, f"Written OAuth2 token from authoriser to {self.config_location}")

View file

@ -6,7 +6,8 @@ import logging
import re
import time
import urllib.parse
from typing import Callable, Optional
from collections.abc import Callable
from typing import Optional
import _hashlib
import requests

View file

@ -3,7 +3,6 @@
import re
import urllib.parse
from typing import Type
from bdfr.exceptions import NotADownloadableLinkError
from bdfr.site_downloaders.base_downloader import BaseDownloader
@ -24,7 +23,7 @@ from bdfr.site_downloaders.youtube import Youtube
class DownloadFactory:
@staticmethod
def pull_lever(url: str) -> Type[BaseDownloader]:
def pull_lever(url: str) -> type[BaseDownloader]:
sanitised_url = DownloadFactory.sanitise_url(url)
if re.match(r"(i\.|m\.)?imgur", sanitised_url):
return Imgur

View file

@ -3,7 +3,8 @@
import logging
import re
from typing import Callable, Optional
from collections.abc import Callable
from typing import Optional
import bs4
from praw.models import Submission

View file

@ -3,8 +3,9 @@
import logging
import tempfile
from collections.abc import Callable
from pathlib import Path
from typing import Callable, Optional
from typing import Optional
import yt_dlp
from praw.models import Submission

View file

@ -1,9 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from collections.abc import Iterator
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterator
from unittest.mock import MagicMock
import praw

View file

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import os
import re
from pathlib import Path
from unittest.mock import MagicMock, patch
@ -118,12 +117,12 @@ def test_file_creation_date(
RedditDownloader._download_submission(downloader_mock, submission)
for file_path in Path(tmp_path).iterdir():
file_stats = os.stat(file_path)
file_stats = Path(file_path).stat()
assert file_stats.st_mtime == test_creation_date
def test_search_existing_files():
results = RedditDownloader.scan_existing_files(Path("."))
results = RedditDownloader.scan_existing_files(Path())
assert len(results.keys()) != 0

View file

@ -6,7 +6,7 @@ import sys
import unittest.mock
from datetime import datetime
from pathlib import Path
from typing import Optional, Type, Union
from typing import Optional, Union
from unittest.mock import MagicMock
import praw.models
@ -222,7 +222,7 @@ def test_format_multiple_resources():
new_mock.source_submission.__class__ = praw.models.Submission
mocks.append(new_mock)
test_formatter = FileNameFormatter("{TITLE}", "", "ISO")
results = test_formatter.format_resource_paths(mocks, Path("."))
results = test_formatter.format_resource_paths(mocks, Path())
results = set([str(res[0].name) for res in results])
expected = {"test_1.png", "test_2.png", "test_3.png", "test_4.png"}
assert results == expected
@ -238,7 +238,7 @@ def test_format_multiple_resources():
),
)
def test_limit_filename_length(test_filename: str, test_ending: str, test_formatter: FileNameFormatter):
result = test_formatter.limit_file_name_length(test_filename, test_ending, Path("."))
result = test_formatter.limit_file_name_length(test_filename, test_ending, Path())
assert len(result.name) <= 255
assert len(result.name.encode("utf-8")) <= 255
assert len(str(result)) <= FileNameFormatter.find_max_path_length()
@ -262,7 +262,7 @@ def test_limit_filename_length(test_filename: str, test_ending: str, test_format
def test_preserve_id_append_when_shortening(
test_filename: str, test_ending: str, expected_end: str, test_formatter: FileNameFormatter
):
result = test_formatter.limit_file_name_length(test_filename, test_ending, Path("."))
result = test_formatter.limit_file_name_length(test_filename, test_ending, Path())
assert len(result.name) <= 255
assert len(result.name.encode("utf-8")) <= 255
assert result.name.endswith(expected_end)
@ -509,13 +509,13 @@ def test_windows_max_path(tmp_path: Path):
)
def test_name_submission(
test_reddit_id: str,
test_downloader: Type[BaseDownloader],
test_downloader: type[BaseDownloader],
expected_names: set[str],
reddit_instance: praw.reddit.Reddit,
):
test_submission = reddit_instance.submission(id=test_reddit_id)
test_resources = test_downloader(test_submission).find_resources()
test_formatter = FileNameFormatter("{TITLE}", "", "")
results = test_formatter.format_resource_paths(test_resources, Path("."))
results = test_formatter.format_resource_paths(test_resources, Path())
results = set([r[0].name for r in results])
assert results == expected_names