1
0
Fork 0
mirror of synced 2024-06-30 03:50:32 +12:00

Merge pull request #769 from OMEGARAZER/gfycat-api

This commit is contained in:
Serene 2023-02-12 11:45:25 +10:00 committed by GitHub
commit 0051877e01
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 101 additions and 33 deletions

View file

@ -2,13 +2,18 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/abravalheri/validate-pyproject
rev: v0.12.1
hooks:
- id: validate-pyproject
- repo: https://github.com/psf/black
rev: 22.12.0
rev: 23.1.0
hooks:
- id: black
- repo: https://github.com/pycqa/isort
rev: 5.11.4
rev: 5.12.0
hooks:
- id: isort
name: isort (python)
@ -23,3 +28,9 @@ repos:
rev: v0.12.0
hooks:
- id: markdownlint
- repo: https://github.com/adamchainz/blacken-docs
rev: 1.13.0
hooks:
- id: blacken-docs
additional_dependencies: [black>=23.1.0]

View file

@ -81,7 +81,7 @@ def _check_version(context, param, value):
if not value or context.resilient_parsing:
return
current = __version__
latest = requests.get("https://pypi.org/pypi/bdfr/json").json()["info"]["version"]
latest = requests.get("https://pypi.org/pypi/bdfr/json", timeout=10).json()["info"]["version"]
print(f"You are currently using v{current} the latest is v{latest}")
context.exit()

View file

@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
def _calc_hash(existing_file: Path):
chunk_size = 1024 * 1024
md5_hash = hashlib.md5()
md5_hash = hashlib.md5(usedforsecurity=False)
with existing_file.open("rb") as file:
chunk = file.read(chunk_size)
while chunk:

View file

@ -25,7 +25,9 @@ class OAuth2Authenticator:
@staticmethod
def _check_scopes(wanted_scopes: set[str]):
response = requests.get(
"https://www.reddit.com/api/v1/scopes.json", headers={"User-Agent": "fetch-scopes test"}
"https://www.reddit.com/api/v1/scopes.json",
headers={"User-Agent": "fetch-scopes test"},
timeout=10,
)
known_scopes = [scope for scope, data in response.json().items()]
known_scopes.append("*")

View file

@ -48,7 +48,7 @@ class Resource:
self.create_hash()
def create_hash(self):
self.hash = hashlib.md5(self.content)
self.hash = hashlib.md5(self.content, usedforsecurity=False)
def _determine_extension(self) -> Optional[str]:
extension_pattern = re.compile(r".*(\..{3,5})$")
@ -67,7 +67,7 @@ class Resource:
max_wait_time = 300
while True:
try:
response = requests.get(url, headers=headers)
response = requests.get(url, headers=headers, timeout=10)
if re.match(r"^2\d{2}", str(response.status_code)) and response.content:
return response.content
elif response.status_code in (408, 429):

View file

@ -27,10 +27,21 @@ class BaseDownloader(ABC):
@staticmethod
def retrieve_url(url: str, cookies: dict = None, headers: dict = None) -> requests.Response:
try:
res = requests.get(url, cookies=cookies, headers=headers)
res = requests.get(url, cookies=cookies, headers=headers, timeout=10)
except requests.exceptions.RequestException as e:
logger.exception(e)
raise SiteDownloaderError(f"Failed to get page {url}")
if res.status_code != 200:
raise ResourceNotFound(f"Server responded with {res.status_code} to {url}")
return res
@staticmethod
def post_url(url: str, cookies: dict = None, headers: dict = None, payload: dict = None) -> requests.Response:
try:
res = requests.post(url, cookies=cookies, headers=headers, json=payload, timeout=10)
except requests.exceptions.RequestException as e:
logger.exception(e)
raise SiteDownloaderError(f"Failed to post to {url}")
if res.status_code != 200:
raise ResourceNotFound(f"Server responded with {res.status_code} to {url}")
return res

View file

@ -41,7 +41,7 @@ class Gallery(BaseDownloader):
possible_extensions = (".jpg", ".png", ".gif", ".gifv", ".jpeg")
for extension in possible_extensions:
test_url = f"https://i.redd.it/{image_id}{extension}"
response = requests.head(test_url)
response = requests.head(test_url, timeout=10)
if response.status_code == 200:
out.append(test_url)
break

View file

@ -4,7 +4,7 @@ import json
import re
from typing import Optional
from bs4 import BeautifulSoup
from cachetools import TTLCache, cached
from praw.models import Submission
from bdfr.exceptions import SiteDownloaderError
@ -20,6 +20,20 @@ class Gfycat(Redgifs):
def find_resources(self, authenticator: Optional[SiteAuthenticator] = None) -> list[Resource]:
return super().find_resources(authenticator)
@staticmethod
@cached(cache=TTLCache(maxsize=5, ttl=3420))
def _get_auth_token() -> str:
headers = {
"content-type": "text/plain;charset=UTF-8",
"host": "weblogin.gfycat.com",
"origin": "https://gfycat.com",
}
payload = {"access_key": "Anr96uuqt9EdamSCwK4txKPjMsf2M95Rfa5FLLhPFucu8H5HTzeutyAa"}
token = json.loads(
Gfycat.post_url("https://weblogin.gfycat.com/oauth/webtoken", headers=headers, payload=payload).text
)["access_token"]
return token
@staticmethod
def _get_link(url: str) -> set[str]:
gfycat_id = re.match(r".*/(.*?)(?:/?|-.*|\..{3-4})$", url).group(1)
@ -27,18 +41,33 @@ class Gfycat(Redgifs):
response = Gfycat.retrieve_url(url)
if re.search(r"(redgifs|gifdeliverynetwork)", response.url):
url = url.lower() # Fixes error with old gfycat/redgifs links
url = url.lower()
return Redgifs._get_link(url)
soup = BeautifulSoup(response.text, "html.parser")
content = soup.find("script", attrs={"data-react-helmet": "true", "type": "application/ld+json"})
auth_token = Gfycat._get_auth_token()
if not auth_token:
raise SiteDownloaderError("Unable to retrieve Gfycat API token")
headers = {
"referer": "https://gfycat.com/",
"origin": "https://gfycat.com",
"content-type": "application/json",
"Authorization": f"Bearer {auth_token}",
}
content = Gfycat.retrieve_url(f"https://api.gfycat.com/v1/gfycats/{gfycat_id}", headers=headers)
if content is None:
raise SiteDownloaderError("Could not read the API source")
try:
out = json.loads(content.contents[0])["video"]["contentUrl"]
response_json = json.loads(content.text)
except json.JSONDecodeError as e:
raise SiteDownloaderError(f"Received data was not valid JSON: {e}")
try:
out = response_json["gfyItem"]["mp4Url"]
except (IndexError, KeyError, AttributeError) as e:
raise SiteDownloaderError(f"Failed to download Gfycat link {url}: {e}")
except json.JSONDecodeError as e:
raise SiteDownloaderError(f"Did not receive valid JSON data: {e}")
return {
out,
}

View file

@ -5,6 +5,7 @@ import re
from typing import Optional
import requests
from cachetools import TTLCache, cached
from praw.models import Submission
from bdfr.exceptions import SiteDownloaderError
@ -21,6 +22,12 @@ class Redgifs(BaseDownloader):
media_urls = self._get_link(self.post.url)
return [Resource(self.post, m, Resource.retry_download(m), None) for m in media_urls]
@staticmethod
@cached(cache=TTLCache(maxsize=5, ttl=82080))
def _get_auth_token() -> str:
token = json.loads(Redgifs.retrieve_url("https://api.redgifs.com/v2/auth/temporary").text)["token"]
return token
@staticmethod
def _get_id(url: str) -> str:
try:
@ -37,7 +44,7 @@ class Redgifs(BaseDownloader):
def _get_link(url: str) -> set[str]:
redgif_id = Redgifs._get_id(url)
auth_token = json.loads(Redgifs.retrieve_url("https://api.redgifs.com/v2/auth/temporary").text)["token"]
auth_token = Redgifs._get_auth_token()
if not auth_token:
raise SiteDownloaderError("Unable to retrieve Redgifs API token")
@ -47,7 +54,6 @@ class Redgifs(BaseDownloader):
"content-type": "application/json",
"Authorization": f"Bearer {auth_token}",
}
content = Redgifs.retrieve_url(f"https://api.redgifs.com/v2/gifs/{redgif_id}", headers=headers)
if content is None:
@ -61,7 +67,7 @@ class Redgifs(BaseDownloader):
out = set()
try:
if response_json["gif"]["type"] == 1: # type 1 is a video
if requests.get(response_json["gif"]["urls"]["hd"], headers=headers).ok:
if requests.head(response_json["gif"]["urls"]["hd"], headers=headers, timeout=10).ok:
out.add(response_json["gif"]["urls"]["hd"])
else:
out.add(response_json["gif"]["urls"]["sd"])
@ -79,7 +85,4 @@ class Redgifs(BaseDownloader):
except (KeyError, AttributeError):
raise SiteDownloaderError("Failed to find JSON data in page")
# Update subdomain if old one is returned
out = {re.sub("thumbs2", "thumbs3", link) for link in out}
out = {re.sub("thumbs3", "thumbs4", link) for link in out}
return out

View file

@ -36,7 +36,7 @@ class Vidble(BaseDownloader):
if not re.search(r"vidble.com/(show/|album/|watch\?v)", url):
url = re.sub(r"/(\w*?)$", r"/show/\1", url)
page = requests.get(url)
page = requests.get(url, timeout=10)
soup = bs4.BeautifulSoup(page.text, "html.parser")
content_div = soup.find("div", attrs={"id": "ContentPlaceHolder1_divContent"})
images = content_div.find_all("img")

View file

@ -25,12 +25,13 @@ classifiers = [
dependencies = [
"appdirs>=1.4.4",
"beautifulsoup4>=4.10.0",
"cachetools>=5.3.0",
"click>=8.0.0",
"dict2xml>=1.7.0",
"praw>=7.2.0",
"pyyaml>=5.4.1",
"requests>=2.25.1",
"yt-dlp>=2022.11.11",
"requests>=2.28.2",
"yt-dlp>=2023.1.6",
]
dynamic = ["version"]
@ -41,11 +42,11 @@ data-files = {"config" = ["bdfr/default_config.cfg",]}
[project.optional-dependencies]
dev = [
"black>=22.12.0",
"black>=23.1.0",
"Flake8-pyproject>=1.2.2",
"isort>=5.11.4",
"pre-commit>=2.20.0",
"pytest>=7.1.0",
"isort>=5.12.0",
"pre-commit>=3.0.4",
"pytest>=7.2.1",
"tox>=3.27.1",
]

View file

@ -13,10 +13,7 @@ from bdfr.site_downloaders.direct import Direct
("test_url", "expected_hash"),
(
("https://i.redd.it/q6ebualjxzea1.jpg", "6ec154859c777cb401132bb991cb3635"),
(
"https://file-examples.com/wp-content/uploads/2017/11/file_example_MP3_700KB.mp3",
"35257826e20227a8a57d0e5a410e03c7",
),
("https://filesamples.com/samples/audio/mp3/sample3.mp3", "d30a2308f188cbb11d74cf20c357891c"),
),
)
def test_download_resource(test_url: str, expected_hash: str):

View file

@ -8,6 +8,13 @@ from bdfr.resource import Resource
from bdfr.site_downloaders.gfycat import Gfycat
@pytest.mark.online
def test_auth_cache():
auth1 = Gfycat._get_auth_token()
auth2 = Gfycat._get_auth_token()
assert auth1 == auth2
@pytest.mark.online
@pytest.mark.parametrize(
("test_url", "expected_url"),

View file

@ -9,6 +9,13 @@ from bdfr.resource import Resource
from bdfr.site_downloaders.redgifs import Redgifs
@pytest.mark.online
def test_auth_cache():
auth1 = Redgifs._get_auth_token()
auth2 = Redgifs._get_auth_token()
assert auth1 == auth2
@pytest.mark.parametrize(
("test_url", "expected"),
(