1
0
Fork 0
mirror of synced 2024-06-29 11:30:30 +12:00
bulk-downloader-for-reddit/bulkredditdownloader/site_downloaders/imgur.py

143 lines
5.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import json
2021-02-07 01:29:13 +13:00
import pathlib
import logging
2021-02-07 01:29:13 +13:00
import requests
2021-02-07 20:08:24 +13:00
from bulkredditdownloader.site_downloaders.base_downloader import BaseDownloader
from bulkredditdownloader.site_downloaders.direct import Direct
2021-02-07 14:33:19 +13:00
from bulkredditdownloader.errors import (AlbumNotDownloadedCompletely, ExtensionError, FileAlreadyExistsError,
ImageNotFound, NotADownloadableLinkError, TypeInSkip)
2021-02-07 14:05:18 +13:00
from bulkredditdownloader.utils import GLOBAL, nameCorrector
logger = logging.getLogger(__name__)
2021-02-07 14:33:19 +13:00
class Imgur(BaseDownloader):
imgur_image_domain = "https://i.imgur.com/"
2021-02-07 01:29:13 +13:00
def __init__(self, directory: pathlib.Path, post: dict):
2021-02-07 14:33:19 +13:00
super().__init__(directory, post)
self.raw_data = {}
self.download()
def download(self):
link = self.post['CONTENTURL']
if link.endswith(".gifv"):
link = link.replace(".gifv", ".mp4")
Direct(self.directory, {**self.post, 'CONTENTURL': link})
2021-02-07 01:29:13 +13:00
return
self.raw_data = self._get_data(link)
if self._is_album:
if self.raw_data["album_images"]["count"] != 1:
self._download_album(self.raw_data["album_images"])
else:
self._download_image(self.raw_data["album_images"]["images"][0])
else:
self._download_image(self.raw_data)
def _download_album(self, images: dict):
folder_name = GLOBAL.config['filename'].format(**self.post)
folder_dir = self.directory / folder_name
images_length = images["count"]
how_many_downloaded = 0
duplicates = 0
folder_dir.mkdir(exist_ok=True)
logger.info(folder_name)
for i in range(images_length):
extension = self._validate_extension(images["images"][i]["ext"])
image_url = self.imgur_image_domain + images["images"][i]["hash"] + extension
filename = pathlib.Path("_".join([str(i + 1),
nameCorrector(images["images"][i]['title']),
images["images"][i]['hash']]) + extension)
logger.info("\n ({}/{})".format(i + 1, images_length))
try:
self._download_resource(filename, folder_dir, image_url, indent=2)
how_many_downloaded += 1
except FileAlreadyExistsError:
logger.info(" The file already exists" + " " * 10, end="\n\n")
duplicates += 1
except TypeInSkip:
logger.info(" Skipping...")
how_many_downloaded += 1
except Exception as exception:
logger.info("\n Could not get the file")
logger.info(
" "
+ "{class_name}: {info}\nSee CONSOLE_LOG.txt for more information".format(
class_name=exception.__class__.__name__,
info=str(exception)
)
+ "\n"
)
logger.info(GLOBAL.log_stream.getvalue(), no_print=True)
if duplicates == images_length:
raise FileAlreadyExistsError
elif how_many_downloaded + duplicates < images_length:
raise AlbumNotDownloadedCompletely("Album Not Downloaded Completely")
def _download_image(self, image: dict):
extension = self._validate_extension(image["ext"])
image_url = self.imgur_image_domain + image["hash"] + extension
filename = GLOBAL.config['filename'].format(**self.post) + extension
self._download_resource(filename, self.directory, image_url)
def _is_album(self) -> bool:
return "album_images" in self.raw_data
@staticmethod
def _get_data(link: str) -> dict:
cookies = {"over18": "1", "postpagebeta": "0"}
res = requests.get(link, cookies=cookies)
if res.status_code != 200:
raise ImageNotFound(f"Server responded with {res.status_code} to {link}")
page_source = requests.get(link, cookies=cookies).text
starting_string = "image : "
ending_string = "group :"
starting_string_lenght = len(starting_string)
try:
start_index = page_source.index(starting_string) + starting_string_lenght
end_index = page_source.index(ending_string, start_index)
except ValueError:
raise NotADownloadableLinkError(
f"Could not read the page source on {link}")
while page_source[end_index] != "}":
end_index -= 1
try:
data = page_source[start_index:end_index + 2].strip()[:-1]
except IndexError:
page_source[end_index + 1] = '}'
data = page_source[start_index:end_index + 3].strip()[:-1]
return json.loads(data)
@staticmethod
def _validate_extension(extension_suffix: str) -> str:
possible_extensions = [".jpg", ".png", ".mp4", ".gif"]
for extension in possible_extensions:
if extension in extension_suffix:
return extension
else:
raise ExtensionError(f"\"{extension_suffix}\" is not recognized as a valid extension.")