"""
Base class for client APIs
"""
import abc
import asyncio
import collections
import os
import aiobtclientrpc
from ... import errors, utils
import logging # isort:skip
_log = logging.getLogger(__name__)
_BackgroundTask = collections.namedtuple('_BackgroundTask', ('task', 'detached'))
[docs]class APIBase(abc.ABC):
"""
Base class for all BitTorrent client APIs
Subclasses are expected to also inherit from a
:class:`aiobtclientrpc.RPCBase` subclass.
:meth:`call`, :meth:`connect` and :meth:`disconnect` catch most low-level
exceptions and translate them into :class:`~.errors.ConnectionError`.
Subclasses must catch :class:`aiobtclientrpc.RPCError` when making RPC calls
(see :meth:`.aiobtclientrpc.RPCError.translate`) and/or populate
:attr:`common_rpc_error_map`.
All asynchronous methods may raise :class:`ConnectionError`.
"""
def __init__(self, *args, **kwargs):
try:
super().__init__(*args, **kwargs)
except ValueError as e:
raise errors.ValueError(e)
self._background_tasks = []
async def __aenter__(self):
return self
async def __aexit__(self, exception_class, exception, traceback):
_log.debug('%s: Closing API', self.label)
await self.wait_for_background_tasks()
await self.disconnect()
[docs] def create_background_task(self, coro, name=None, done_callback=None, detach=False):
"""
Run coroutine in background
:param coro: Coroutine or any other argument for :func:`asyncio.create_task`
:param name: Identifier to help with debugging
:param done_callback: Synchronous callable that is called with the
return value of `coro`
:param detach: Whether to always wait for `coro` to return
If this is set to a truthy value, `coro` is cancelled by
:meth:`wait_for_background_tasks` and
:class:`~.asyncio.CancelledError` is ignored.
If this is set to a falsy value, :meth:`wait_for_background_tasks`
blocks until `coro` returns.
:return: :class:`asyncio.Task` instance
"""
# Store references to background tasks so they don't get garbage
# collected.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
# Also remember if we care about the task to finish or not.
taskinfo = _BackgroundTask(
task=asyncio.create_task(coro),
detached=bool(detach),
)
self._background_tasks.append(taskinfo)
if name:
taskinfo.task.set_name(name)
else:
taskinfo.task.set_name(f'{coro.__qualname__} [{id(coro)}]')
def done(task):
self._background_tasks.remove(taskinfo)
try:
_log.debug('%s: Background task done: %r', self.label, task.get_name())
# Raise exception from task
result = task.result()
if done_callback:
# task.result() also raises any exception from `coro`
done_callback(result)
except asyncio.CancelledError:
_log.debug('%s: Background task was cancelled: %r', self.label, task.get_name())
except BaseException as e:
# Exceptions from `coro` or `done_callback` end up nowhere, so
# we print the traceback manually and beat it.
import sys, traceback # noqa: E401, isort:skip
tb = ''.join(traceback.format_exception(type(e), e, e.__traceback__))
_log.debug('%s: Background task or done_callback croaked: %r', self.label, task.get_name())
_log.debug('%s: %s', self.label, tb)
print(tb, file=sys.stderr)
sys.exit(99)
taskinfo.task.add_done_callback(done)
return taskinfo.task
[docs] async def wait_for_background_tasks(self):
"""
Wait for all tasks created by :meth:`create_background_task`
Detached tasks are cancelled first.
Exceptions from tasks are raised here, except for
:class:`asyncio.CancelledError` from a detached task.
"""
for taskinfo in self._background_tasks:
if taskinfo.detached:
_log.debug('%s: Cancelling background task: %r', self.label, taskinfo.task.get_name())
taskinfo.task.cancel()
# Tasks are removed from self._background_tasks when they are done,
# i.e. self._background_tasks changes size while we're iterating over
# it. Wrapping it in a tuple should avoid any related issues.
for taskinfo in tuple(self._background_tasks):
_log.debug('%s: Waiting for background task: %r', self.label, taskinfo.task.get_name())
try:
await taskinfo.task
except asyncio.CancelledError:
if not taskinfo.detached:
raise
else:
_log.debug('%s: Ignoring cancelled background task: %r', self.label, taskinfo.task.get_name())
monitor_interval = 0.1
"""Seconds between requests when waiting for an RPC call to take effect"""
common_rpc_error_map = {}
"""
Mapping of regular expressions to exceptions for all :meth:`call` calls
See :meth:`aiobtclientrpc.RPCError.translate`.
"""
[docs] async def call(self, *args, **kwargs):
"""
Wrapper around :meth:`aiobtclientrpc.RPCBase.call` that handles exceptions
This is a thin wrapper that translates the following exceptions into
:class:`~.errors.ConnectionError`:
* :class:`aiobtclientrpc.ConnectionError`
* :class:`aiobtclientrpc.TimeoutError`
* :class:`aiobtclientrpc.AuthenticationError`
It also converts any common errors by passing
:attr:`common_rpc_error_map` to
:meth:`aiobtclientrpc.RPCError.translate`.
"""
try:
return await super().call(*args, **kwargs)
except aiobtclientrpc.AuthenticationError as e:
raise errors.AuthenticationError(f'{e}: {self.url}')
except aiobtclientrpc.TimeoutError as e:
raise errors.TimeoutError(f'{e}: {self.url}')
except aiobtclientrpc.ConnectionError as e:
raise errors.ConnectionError(f'{e}: {self.url}')
except aiobtclientrpc.RPCError as e:
raise e.translate(self.common_rpc_error_map)
[docs] async def connect(self, *args, **kwargs):
"""
Wrapper around :meth:`aiobtclientrpc.RPCBase.connect` that handles
exceptions
See :meth:`call`.
"""
try:
return await super().connect(*args, **kwargs)
except aiobtclientrpc.AuthenticationError as e:
raise errors.AuthenticationError(f'{e}: {self.url}')
except aiobtclientrpc.TimeoutError as e:
raise errors.TimeoutError(f'{e}: {self.url}')
except aiobtclientrpc.ConnectionError as e:
raise errors.ConnectionError(f'{e}: {self.url}')
[docs] async def disconnect(self, *args, **kwargs):
"""
Wrapper around :meth:`aiobtclientrpc.RPCBase.disconnect` that handles
exceptions
See :meth:`call`.
"""
try:
return await super().disconnect(*args, **kwargs)
except aiobtclientrpc.AuthenticationError as e:
raise errors.AuthenticationError(f'{e}: {self.url}')
except aiobtclientrpc.TimeoutError as e:
raise errors.TimeoutError(f'{e}: {self.url}')
except aiobtclientrpc.ConnectionError as e:
raise errors.ConnectionError(f'{e}: {self.url}')
@staticmethod
def _normalize_infohash(infohash):
# Concrete clients may need special infohash values (e.g. all upper case)
return utils.Infohash(infohash)
[docs] async def get_infohashes(self):
"""Return sequence of all known infohashes"""
return [
self._normalize_infohash(infohash)
for infohash in await self._get_infohashes()
]
@abc.abstractmethod
async def _get_infohashes(self):
pass
@abc.abstractmethod
async def _get_torrent_fields(self, infohash, *fields):
pass
async def _get_torrent_field(self, infohash, field):
"""
Convenience wrapper around :meth:`_get_torrent_fields` for getting a single
field
"""
infohash = self._normalize_infohash(infohash)
fields = await self._get_torrent_fields(infohash, field)
return fields[field]
[docs] async def add(self, torrent, *, location=None, stopped=False, verify=True):
"""
Add torrent to client
:param torrent: Path or URL to torrent file, ``magnet:`` URI or infohash
:param str location: Download directory or `None` to use the default
This should be an absolute path. If it isn't, it is made absolute
based on the current working directory, which may be surprising or
even non-sensical if the client is running in a different
environment.
:param bool stopped: Whether the torrent is active right away
:param bool verify: Whether any existing files from the torrent are
hashed by the client to make sure they are not corrupt
:return: :class:`~.Infohash` of the added torrent
:raise errors.AddTorrentError: if adding fails
"""
try:
infohash = await self._add(
torrent=torrent,
location=str(os.path.abspath(location)) if location else None,
stopped=stopped,
verify=verify,
)
except errors.Error as e:
raise errors.AddTorrentError(e)
else:
return utils.Infohash(infohash)
@abc.abstractmethod
async def _add(self, torrent, **kwargs):
pass
[docs] async def start(self, infohash):
"""
Start torrent
:param infohashes: Infohash of the torrent to start
:return: `None` when the torrent was started
:raise errors.StartTorrentError: if starting the torrent failed
"""
try:
await self._start(
self._normalize_infohash(infohash)
)
except errors.Error as e:
raise errors.StartTorrentError(e)
@abc.abstractmethod
async def _start(self, infohash):
pass
[docs] async def stop(self, infohash):
"""
Stop torrent
:param infohash: Infohash of the torrent to stop
:return: `None` when the torrent was stopped
:raise errors.StopTorrentError: if stopping the torrent failed
"""
try:
await self._stop(
self._normalize_infohash(infohash)
)
except errors.Error as e:
raise errors.StopTorrentError(e)
@abc.abstractmethod
async def _stop(self, infohash, **kwargs):
pass
[docs] async def verify(self, infohash):
"""
Initiate hash check of a torrent's files
See also :meth:`verify_wait`.
:param infohash: Infohash of the torrent to check
:return: `None` when the verification was initiated
:raise errors.VerifyTorrentError: if initiating the verification failed
"""
try:
await self._verify(
self._normalize_infohash(infohash)
)
except errors.Error as e:
raise errors.VerifyTorrentError(e)
async def _verify(self, infohash):
is_verifying = await self._torrent_is_verifying(infohash)
if is_verifying:
raise errors.TorrentAlreadyVerifying(infohash)
else:
await self._start_verifying(infohash)
# Wait for command to take effect
try:
await utils.Monitor(
call=utils.partial(self._torrent_is_verifying, infohash),
interval=0.1,
timeout=1.0,
).return_value_equals(True)
except errors.TimeoutError:
# Verifying may never start because all files are missing or
# because the torrent is very small and verification was
# finished before Monitor's first call.
_log.debug('Verification finished immediately: %r', infohash)
[docs] async def verify_wait(self, *infohashes, interval=(0.3, 3)):
"""
Asynchronous generator that yields ``(infohash, progress)`` tuples
``progress`` is either a number from ``0.0`` to ``100.0`` or
:class:`~.errors.Error` or :class:`~.errors.Warning`.
Every infohash is yielded at least once.
:param infohashes: Infohashes of the torrents
:param interval: Delay between progress updates
``seconds``
Always use the same delay.
``(seconds_min, seconds_max)``
Dynamically change the delay based on how much time is left.
``seconds_min`` is used until ``progress`` gets close to
``100.0``. Then, ``seconds_min`` gradually moves to
``seconds_max``.
This allows to spread out update requests until the end, where
you want to check progress more frequently to know when the
verification is finished.
"""
if isinstance(interval, (int, float)):
interval_min = interval_max = interval
elif isinstance(interval, tuple) and len(interval) == 2:
interval_min, interval_max = interval
else:
raise TypeError(f'Invalid interval: {interval!r}')
waiters = [
self._verify_wait(
self._normalize_infohash(infohash),
interval_min,
interval_max,
)
for infohash in infohashes
]
async for coro in utils.merge_async_generators(*waiters):
infohash, progress_or_exception = await coro
yield (infohash, progress_or_exception)
async def _verify_wait(self, infohash, interval_min, interval_max):
assert isinstance(infohash, utils.Infohash)
interval = utils.DynamicInterval(
name=infohash,
min=interval_min,
max=interval_max,
progress_getter=utils.partial(self._get_verifying_progress, infohash),
)
try:
# Initial progress
yield (infohash, await self._get_verifying_progress(infohash))
progress = 0
while await self._torrent_is_verifying(infohash):
if interval.progress != progress:
yield (infohash, interval.progress)
progress = interval.progress
await interval.sleep()
# Final progress
yield (infohash, await self._get_verifying_progress(infohash))
except (errors.Error, errors.Warning) as e:
yield (infohash, e)
@abc.abstractmethod
async def _torrent_is_verifying(self, infohash):
"""`True` if torrent is verifying or queued for verification"""
@abc.abstractmethod
async def _start_verifying(self, infohash):
"""Initiate hash check"""
@abc.abstractmethod
async def _get_verifying_progress(self, infohash):
"""
Verifying progress in percent (0 to 100)
After verification is done (:meth:`_torrent_is_verifying` returns
`False`), return the combined download progress of the wanted files in
percent.
"""