1
0
Fork 0
mirror of synced 2024-10-01 01:30:52 +13:00

Add Flickr

This commit is contained in:
OMEGARAZER 2023-06-08 23:08:37 -04:00
parent 56ecc06f10
commit 42264f0872
No known key found for this signature in database
GPG key ID: D89925310D306E35
3 changed files with 205 additions and 0 deletions

View file

@ -10,6 +10,7 @@ from bdfr.site_downloaders.delay_for_reddit import DelayForReddit
from bdfr.site_downloaders.direct import Direct
from bdfr.site_downloaders.erome import Erome
from bdfr.site_downloaders.fallback_downloaders.ytdlp_fallback import YtdlpFallback
from bdfr.site_downloaders.flickr import Flickr
from bdfr.site_downloaders.gallery import Gallery
from bdfr.site_downloaders.gfycat import Gfycat
from bdfr.site_downloaders.imgchest import Imgchest
@ -42,6 +43,8 @@ class DownloadFactory:
return Catbox
elif re.match(r"delayforreddit\.com", sanitised_url):
return DelayForReddit
elif re.match(r"flickr\.com", sanitised_url) or re.match(r"flic\.kr", sanitised_url):
return Flickr
elif re.match(r"reddit\.com/gallery/.*", sanitised_url):
return Gallery
elif re.match(r"patreon\.com.*", sanitised_url):

View file

@ -0,0 +1,111 @@
import json
import re
from typing import Optional
from bs4 import BeautifulSoup
from cachetools import TTLCache, cached
from praw.models import Submission
from bdfr.exceptions import SiteDownloaderError
from bdfr.resource import Resource
from bdfr.site_authenticator import SiteAuthenticator
from bdfr.site_downloaders.base_downloader import BaseDownloader
class Flickr(BaseDownloader):
def __init__(self, post: Submission) -> None:
super().__init__(post)
self.raw_data = {}
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
links = self._get_data(self.post.url)
if not links:
raise SiteDownloaderError("Flickr could not find any images to download")
return [Resource(self.post, link, Resource.retry_download(link)) for link in links]
@staticmethod
@cached(cache=TTLCache(maxsize=5, ttl=10260))
def _get_api_key() -> str:
key_regex = re.compile(r".*api_key=(\w*)(&.*)?")
req = Flickr.retrieve_url("https://www.flickr.com/services/api/response.json.html").text
elements = BeautifulSoup(req, "html.parser")
links = elements.find_all("a", href=True, string="here")
return key_regex.search(str(links[0])).group(1)
@staticmethod
def _get_ids(link: str) -> str:
flickr_regex = re.compile(r".*/photos/(?P<user>\d*@\D\d*|\w*)/(?:albums/(?P<album>\d*)|(?P<photo>\d*))")
try:
flickr_id = flickr_regex.search(link).group("photo")
if not flickr_id:
flickr_id = flickr_regex.search(link).group("album")
user = flickr_regex.search(link).group("user")
except AttributeError:
raise SiteDownloaderError(f"Could not extract Flickr ID from {link}")
return user, flickr_id
@staticmethod
def _construct_direct_link(image_dict: json) -> str:
image_id = image_dict["photo"]["id"]
secret = image_dict["photo"]["secret"]
server = image_dict["photo"]["server"]
originalsecret = None
if "originalsecret" in image_dict["photo"]:
originalsecret = image_dict["photo"]["originalsecret"]
if "originalformat" in image_dict["photo"]:
originalformat = image_dict["photo"]["originalformat"]
if originalsecret:
return f"https://live.staticflickr.com/{server}/{image_id}_{originalsecret}_o.{originalformat}"
return f"https://live.staticflickr.com/{server}/{image_id}_{secret}_b.jpg"
@staticmethod
def _get_album_links(album_dict: json, api_string: str) -> list:
out = []
for photo in album_dict["photoset"]["photo"]:
res = Flickr.retrieve_url(f"{api_string}method=flickr.photos.getInfo&photo_id={photo['id']}")
image_dict = json.loads(res.text)
out.append(Flickr._construct_direct_link(image_dict))
return out
@staticmethod
def _get_user_id(user: str, api_string: str) -> str:
try:
req = Flickr.retrieve_url(
f"{api_string}method=flickr.urls.lookupUser&url=https://flickr.com/photos/{user}",
).text
return json.loads(req)["user"]["id"]
except json.JSONDecodeError as e:
raise SiteDownloaderError(f"Could not parse flickr user ID from API: {e}")
@staticmethod
def _expand_link(link: str) -> str:
return Flickr.retrieve_url(link).url
@staticmethod
def _get_data(link: str) -> list:
if ("/gp/" in link) or ("flic.kr" in link):
link = Flickr._expand_link(link)
user, flickr_id = Flickr._get_ids(link)
api_key = Flickr._get_api_key()
api_string = f"https://www.flickr.com/services/rest/?api_key={api_key}&format=json&nojsoncallback=1&"
album = False
if "/albums/" in link:
if "@" not in user:
user = Flickr._get_user_id(user, api_string)
api = f"{api_string}method=flickr.photosets.getPhotos&photoset_id={flickr_id}&user_id={user}"
album = True
else:
api = f"{api_string}method=flickr.photos.getInfo&photo_id={flickr_id}"
res = Flickr.retrieve_url(api)
try:
image_dict = json.loads(res.text)
except json.JSONDecodeError as e:
raise SiteDownloaderError(f"Could not parse received response as JSON: {e}")
image_dict = (
Flickr._get_album_links(image_dict, api_string) if album else [Flickr._construct_direct_link(image_dict)]
)
return image_dict

View file

@ -0,0 +1,91 @@
from unittest.mock import Mock
import pytest
from bdfr.resource import Resource
from bdfr.site_downloaders.flickr import Flickr
@pytest.mark.online
def test_key_cache():
key1 = Flickr._get_api_key()
key2 = Flickr._get_api_key()
assert key1 == key2
@pytest.mark.parametrize(
("test_url", "expected_user", "expected_id"),
(
("https://www.flickr.com/photos/137434519@N08/33635695603", "137434519@N08", "33635695603"), # Single photo
(
"https://www.flickr.com/photos/63215229@N04/albums/72157644975251416", # Album
"63215229@N04",
"72157644975251416",
),
),
)
def test_get_ids(test_url: str, expected_user: str, expected_id: str):
user, f_id = Flickr._get_ids(test_url)
assert user == expected_user
assert f_id == expected_id
@pytest.mark.online
@pytest.mark.parametrize(
("test_url", "expected_url"),
(
(
"https://www.flickr.com/gp/137434519@N08/83Q029", # /gp/ link
"https://www.flickr.com/photos/137434519@N08/33635695603/",
),
("https://flic.kr/p/2k5E4mv", "https://www.flickr.com/photos/129756120@N03/50592162657/"), # flic.kr link
),
)
def test_expand_url(test_url: str, expected_url: str):
link = Flickr._expand_link(test_url)
assert link == expected_url
@pytest.mark.online
@pytest.mark.parametrize(
("test_id", "expected_user"),
(("buta_suneo", "63215229@N04"),), # username to user ID
)
def test_get_user_id(test_id: str, expected_user: str):
api_key = Flickr._get_api_key()
api_string = f"https://www.flickr.com/services/rest/?api_key={api_key}&format=json&nojsoncallback=1&"
user = Flickr._get_user_id(test_id, api_string)
assert user == expected_user
@pytest.mark.online
@pytest.mark.parametrize(
("test_url", "expected_hashes"),
(
("https://www.flickr.com/gp/137434519@N08/83Q029", {"b3f4e6fca1cc0ffca55368e4f94f9b5f"}), # Single photo
("https://flic.kr/p/2k5E4mv", {"75ae4f5e70b9b7525041b1dcc852d144"}), # Single photo
(
"http://www.flickr.com/photos/thekog/6886709962/", # Single photo
{"a4a64e606368f7b5a1995c84e15463e9"},
),
(
"https://www.flickr.com/photos/ochre_jelly/albums/72157708743730852", # Album
{
"3c442ffdadff7b02cb7a133865339a26",
"8023fc0e76f891d585871ddd64edac23",
"9bbedad97b59ec51cb967da507351912",
"a86fcd3458620eec4cb3606882d11e9a",
"addb62d788c542383d1ad47914bbefb3",
},
),
),
)
def test_download_resource(test_url: str, expected_hashes: set[str]):
mock_submission = Mock()
mock_submission.url = test_url
test_site = Flickr(mock_submission)
results = test_site.find_resources()
assert all(isinstance(res, Resource) for res in results)
[res.download() for res in results]
hashes = {res.hash.hexdigest() for res in results}
assert hashes == set(expected_hashes)