1
0
Fork 0
mirror of synced 2024-10-01 09:41:03 +13:00
ANN201 fixes
This commit is contained in:
OMEGARAZER 2023-02-18 20:16:28 -05:00
parent caa4421c78
commit a5f5df7ab3
No known key found for this signature in database
GPG key ID: D89925310D306E35
9 changed files with 25 additions and 25 deletions

View file

@ -100,7 +100,7 @@ def _check_version(context, _param, value):
callback=_check_version,
help="Check version and exit.",
)
def cli():
def cli() -> None:
"""BDFR is used to download and archive content from Reddit."""
pass
@ -110,7 +110,7 @@ def cli():
@_add_options(_downloader_options)
@click.help_option("-h", "--help")
@click.pass_context
def cli_download(context: click.Context, **_):
def cli_download(context: click.Context, **_) -> None:
"""Used to download content posted to Reddit."""
config = Configuration()
config.process_click_arguments(context)
@ -131,7 +131,7 @@ def cli_download(context: click.Context, **_):
@_add_options(_archiver_options)
@click.help_option("-h", "--help")
@click.pass_context
def cli_archive(context: click.Context, **_):
def cli_archive(context: click.Context, **_) -> None:
"""Used to archive post data from Reddit."""
config = Configuration()
config.process_click_arguments(context)
@ -153,7 +153,7 @@ def cli_archive(context: click.Context, **_):
@_add_options(_downloader_options)
@click.help_option("-h", "--help")
@click.pass_context
def cli_clone(context: click.Context, **_):
def cli_clone(context: click.Context, **_) -> None:
"""Combines archive and download commands."""
config = Configuration()
config.process_click_arguments(context)
@ -173,7 +173,7 @@ def cli_clone(context: click.Context, **_):
@click.argument("shell", type=click.Choice(("all", "bash", "fish", "zsh"), case_sensitive=False), default="all")
@click.help_option("-h", "--help")
@click.option("-u", "--uninstall", is_flag=True, default=False, help="Uninstall completion")
def cli_completion(shell: str, uninstall: bool):
def cli_completion(shell: str, uninstall: bool) -> None:
"""\b
Installs shell completions for BDFR.
Options: all, bash, fish, zsh
@ -215,7 +215,7 @@ def make_console_logging_handler(verbosity: int) -> logging.StreamHandler:
return stream
def silence_module_loggers():
def silence_module_loggers() -> None:
logging.getLogger("praw").setLevel(logging.CRITICAL)
logging.getLogger("prawcore").setLevel(logging.CRITICAL)
logging.getLogger("urllib3").setLevel(logging.CRITICAL)

View file

@ -28,7 +28,7 @@ class Archiver(RedditConnector):
def __init__(self, args: Configuration, logging_handlers: Iterable[logging.Handler] = ()) -> None:
super().__init__(args, logging_handlers)
def download(self):
def download(self) -> None:
for generator in self.reddit_lists:
try:
for submission in generator:
@ -82,7 +82,7 @@ class Archiver(RedditConnector):
else:
raise ArchiverError(f"Factory failed to classify item of type {type(praw_item).__name__}")
def write_entry(self, praw_item: Union[praw.models.Submission, praw.models.Comment]):
def write_entry(self, praw_item: Union[praw.models.Submission, praw.models.Comment]) -> None:
if self.args.comment_context and isinstance(praw_item, praw.models.Comment):
logger.debug(f"Converting comment {praw_item.id} to submission {praw_item.submission.id}")
praw_item = praw_item.submission

View file

@ -17,7 +17,7 @@ class RedditCloner(RedditDownloader, Archiver):
def __init__(self, args: Configuration, logging_handlers: Iterable[logging.Handler] = ()) -> None:
super().__init__(args, logging_handlers)
def download(self):
def download(self) -> None:
for generator in self.reddit_lists:
try:
for submission in generator:

View file

@ -14,7 +14,7 @@ class Completion:
self.share_dir = appdirs.user_data_dir()
self.entry_points = ["bdfr", "bdfr-archive", "bdfr-clone", "bdfr-download"]
def install(self):
def install(self) -> None:
if self.shell in ("all", "bash"):
comp_dir = self.share_dir + "/bash-completion/completions/"
if not Path(comp_dir).exists():
@ -46,7 +46,7 @@ class Completion:
file.write(subprocess.run([point], env=self.env, capture_output=True, text=True).stdout)
print(f"Zsh completion for {point} written to {comp_dir}_{point}")
def uninstall(self):
def uninstall(self) -> None:
if self.shell in ("all", "bash"):
comp_dir = self.share_dir + "/bash-completion/completions/"
for point in self.entry_points:

View file

@ -58,7 +58,7 @@ class Configuration(Namespace):
self.format = "json"
self.comment_context: bool = False
def process_click_arguments(self, context: click.Context):
def process_click_arguments(self, context: click.Context) -> None:
if context.params.get("opts") is not None:
self.parse_yaml_options(context.params["opts"])
for arg_key in context.params.keys():
@ -71,7 +71,7 @@ class Configuration(Namespace):
continue
setattr(self, arg_key, val)
def parse_yaml_options(self, file_path: str):
def parse_yaml_options(self, file_path: str) -> None:
yaml_file_loc = Path(file_path)
if not yaml_file_loc.exists():
logger.error(f"No YAML file found at {yaml_file_loc}")

View file

@ -100,7 +100,7 @@ class RedditConnector(metaclass=ABCMeta):
for handler in handlers:
main_logger.addHandler(handler)
def read_config(self):
def read_config(self) -> None:
"""Read any cfg values that need to be processed"""
if self.args.max_wait_time is None:
self.args.max_wait_time = self.cfg_parser.getint("DEFAULT", "max_wait_time", fallback=120)
@ -122,14 +122,14 @@ class RedditConnector(metaclass=ABCMeta):
with Path(self.config_location).open(mode="w") as file:
self.cfg_parser.write(file)
def parse_disabled_modules(self):
def parse_disabled_modules(self) -> None:
disabled_modules = self.args.disable_module
disabled_modules = self.split_args_input(disabled_modules)
disabled_modules = {name.strip().lower() for name in disabled_modules}
self.args.disable_module = disabled_modules
logger.debug(f"Disabling the following modules: {', '.join(self.args.disable_module)}")
def create_reddit_instance(self):
def create_reddit_instance(self) -> None:
if self.args.authenticate:
logger.debug("Using authenticated Reddit instance")
if not self.cfg_parser.has_option("DEFAULT", "user_token"):
@ -176,14 +176,14 @@ class RedditConnector(metaclass=ABCMeta):
logger.log(9, "Retrieved submissions for given links")
return master_list
def determine_directories(self):
def determine_directories(self) -> None:
self.download_directory = Path(self.args.directory).resolve().expanduser()
self.config_directory = Path(self.config_directories.user_config_dir)
self.download_directory.mkdir(exist_ok=True, parents=True)
self.config_directory.mkdir(exist_ok=True, parents=True)
def load_config(self):
def load_config(self) -> None:
self.cfg_parser = configparser.ConfigParser()
if self.args.config:
if (cfg_path := Path(self.args.config)).exists():
@ -393,7 +393,7 @@ class RedditConnector(metaclass=ABCMeta):
else:
return []
def check_user_existence(self, name: str):
def check_user_existence(self, name: str) -> None:
user = self.reddit_instance.redditor(name=name)
try:
if user.id:
@ -428,7 +428,7 @@ class RedditConnector(metaclass=ABCMeta):
return SiteAuthenticator(self.cfg_parser)
@abstractmethod
def download(self):
def download(self) -> None:
pass
@staticmethod

View file

@ -41,7 +41,7 @@ class RedditDownloader(RedditConnector):
if self.args.search_existing:
self.master_hash_list = self.scan_existing_files(self.download_directory)
def download(self):
def download(self) -> None:
for generator in self.reddit_lists:
try:
for submission in generator:

View file

@ -97,7 +97,7 @@ class OAuth2TokenManager(praw.reddit.BaseTokenManager):
self.config = config
self.config_location = config_location
def pre_refresh_callback(self, authorizer: praw.reddit.Authorizer):
def pre_refresh_callback(self, authorizer: praw.reddit.Authorizer) -> None:
if authorizer.refresh_token is None:
if self.config.has_option("DEFAULT", "user_token"):
authorizer.refresh_token = self.config.get("DEFAULT", "user_token")
@ -105,7 +105,7 @@ class OAuth2TokenManager(praw.reddit.BaseTokenManager):
else:
raise RedditAuthenticationError("No auth token loaded in configuration")
def post_refresh_callback(self, authorizer: praw.reddit.Authorizer):
def post_refresh_callback(self, authorizer: praw.reddit.Authorizer) -> None:
self.config.set("DEFAULT", "user_token", authorizer.refresh_token)
with Path(self.config_location).open(mode="w") as file:
self.config.write(file, True)

View file

@ -34,7 +34,7 @@ class Resource:
def retry_download(url: str) -> Callable:
return lambda global_params: Resource.http_download(url, global_params)
def download(self, download_parameters: Optional[dict] = None):
def download(self, download_parameters: Optional[dict] = None) -> None:
if download_parameters is None:
download_parameters = {}
if not self.content:
@ -49,7 +49,7 @@ class Resource:
if not self.hash and self.content:
self.create_hash()
def create_hash(self):
def create_hash(self) -> None:
self.hash = hashlib.md5(self.content, usedforsecurity=False)
def _determine_extension(self) -> Optional[str]: