"""
Utilities
"""
import asyncio
import io
import os
import re
import httpx
import torf
from .. import constants, errors
from . import torrent
from .monitor import DynamicInterval, Monitor
# Mockable standard library tools that are used by test runners or upstream
[docs]def partial(*args, **kwargs):
from functools import partial
return partial(*args, **kwargs)
[docs]def is_magnet(string):
"""Return `True` if `string` is a magnet URI, `False` otherwise"""
return str(string).lower().startswith('magnet:')
is_infohash_regex = re.compile(r'^([0-9a-fA-F]{40})$')
find_infohash_regex = re.compile(r'([0-9a-fA-F]{40})')
[docs]def is_infohash(string):
"""Return `True` if `string` looks like a torrent infohash, `False` otherwise"""
match = is_infohash_regex.search(str(string))
return bool(match and match.string == match.group(1))
[docs]class Infohash(str):
"""Case-insensitive string of exactly 40 hexadecimal digits"""
# Maybe save a few bytes of memory
__slots__ = ()
def __new__(cls, string):
s = str(string)
if not is_infohash_regex.search(s):
raise errors.ValueError(f'Invalid infohash: {string!r}')
else:
return super().__new__(cls, s.lower())
def __eq__(self, other):
if isinstance(other, str):
return self.lower() == other.lower()
else:
return NotImplemented
url_regex = re.compile(r'^(?i:[0-9a-zA-Z\+\.-]+)://')
[docs]def is_url(string):
"""Return `True` if `string` looks like an URL, `False` otherwise"""
return bool(url_regex.search(str(string)))
[docs]def read_bytes(path, maxsize=None):
"""
Return :class:`bytes` from file
:param path: Path to file
:param maxsize: Maximum size of `path` in bytes
:raise ReadError: if reading `path` fails or size of `path` exceeds
`maxsize` bytes
"""
try:
if maxsize is not None:
size = os.path.getsize(path)
if size > maxsize:
raise errors.ReadError(f'Too big ({size} bytes): {path}')
with open(path, 'rb') as f:
return f.read()
except OSError as e:
msg = e.strerror if e.strerror else str(e)
raise errors.ReadError(f'{msg}: {path}')
[docs]async def download(url, to=None, maxsize=None):
"""
Download URL to file or return :class:`bytes`
:param url: URL to download
:param to: File path to store bytes from `url` in
:param maxsize: Maximum allowed ``Content-Length`` or `None` for unlimited
download size
:raise ReadError: if anything goes wrong
"""
if to:
try:
file = open(str(to), 'wb')
except OSError as e:
msg = e.strerror if e.strerror else str(e)
raise errors.WriteError(f'{msg}: {to}')
else:
file = io.BytesIO()
with file:
try:
await _download_to_stream(url, file, maxsize)
except httpx.HTTPStatusError as e:
raise errors.ReadError(f'{e.response.reason_phrase}: {url}')
except httpx.TimeoutException:
raise errors.ReadError(f'Timeout after {constants.HTTP_REQUEST_TIMEOUT} seconds: {url}')
except httpx.HTTPError as e:
raise errors.ReadError(f'{e}: {url}')
except OSError as e:
# Writing to `file` failed
msg = e.strerror if e.strerror else str(e)
if to:
raise errors.WriteError(f'{msg}: {to}')
else:
raise errors.WriteError(f'{msg}')
else:
# Return downloaded bytes unless they were written to file
if not to:
file.seek(0)
return file.read()
async def _download_to_stream(url, file, maxsize):
client = httpx.AsyncClient(
follow_redirects=True,
timeout=constants.HTTP_REQUEST_TIMEOUT,
)
async with client:
async with client.stream('GET', url) as response:
# Raise exception on HTTP error, e.g. 404
response.raise_for_status()
if maxsize is not None:
size = int(response.headers['Content-Length'])
if size > maxsize:
raise errors.ReadError(f'Too big ({size} bytes): {url}')
async for chunk in response.aiter_bytes():
file.write(chunk)
[docs]async def merge_async_generators(*generators):
"""
Combine multiple asynchronous generators into one
Every generated value is wrapped in a coroutine that returns it or raises an
exception raised by the generator.
Example:
>>> async for coro in merge_async_generators(a, b, c):
>>> try:
>>> print('Good value:', await coro)
>>> except ValueError as e:
>>> print('Bad value:', e)
"""
aiters = (g.__aiter__() for g in generators)
tasks = {
asyncio.create_task(aiter.__anext__()): aiter
for aiter in aiters
}
while tasks:
done, pending_ = await asyncio.wait(
tasks.keys(),
return_when=asyncio.FIRST_COMPLETED,
)
for task in done:
aiter = tasks[task]
del tasks[task]
result = exception = None
try:
# This also raises any exceptions from the generator
result = task.result()
except StopAsyncIteration:
continue
except Exception as e:
exception = e
else:
next_task = asyncio.create_task(aiter.__anext__())
tasks[next_task] = aiter
async def return_result(result, exception):
if exception:
raise exception
else:
return result
yield return_result(result, exception)
[docs]def without_None_values(dct):
"""Return copy of `dct` without the keys that map to `None`"""
return {
k: v
for k, v in dct.items()
if v is not None
}