Source code for aiobtclientapi.clients.base.api

"""
Base class for client APIs
"""

import abc
import asyncio
import collections
import os

import aiobtclientrpc

from ... import errors, utils

import logging  # isort:skip
_log = logging.getLogger(__name__)


_BackgroundTask = collections.namedtuple('_BackgroundTask', ('task', 'detached'))


[docs]class APIBase(abc.ABC): """ Base class for all BitTorrent client APIs Subclasses are expected to also inherit from a :class:`aiobtclientrpc.RPCBase` subclass. :meth:`call`, :meth:`connect` and :meth:`disconnect` catch most low-level exceptions and translate them into :class:`~.errors.ConnectionError`. Subclasses must catch :class:`aiobtclientrpc.RPCError` when making RPC calls (see :meth:`.aiobtclientrpc.RPCError.translate`) and/or populate :attr:`common_rpc_error_map`. All asynchronous methods may raise :class:`ConnectionError`. """ def __init__(self, *args, **kwargs): try: super().__init__(*args, **kwargs) except ValueError as e: raise errors.ValueError(e) self._background_tasks = [] async def __aenter__(self): return self async def __aexit__(self, exception_class, exception, traceback): _log.debug('%s: Closing API', self.label) await self.wait_for_background_tasks() await self.disconnect()
[docs] def create_background_task(self, coro, name=None, done_callback=None, detach=False): """ Run coroutine in background :param coro: Coroutine or any other argument for :func:`asyncio.create_task` :param name: Identifier to help with debugging :param done_callback: Synchronous callable that is called with the return value of `coro` :param detach: Whether to always wait for `coro` to return If this is set to a truthy value, `coro` is cancelled by :meth:`wait_for_background_tasks` and :class:`~.asyncio.CancelledError` is ignored. If this is set to a falsy value, :meth:`wait_for_background_tasks` blocks until `coro` returns. :return: :class:`asyncio.Task` instance """ # Store references to background tasks so they don't get garbage # collected. # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task # Also remember if we care about the task to finish or not. taskinfo = _BackgroundTask( task=asyncio.create_task(coro), detached=bool(detach), ) self._background_tasks.append(taskinfo) if name: taskinfo.task.set_name(name) else: taskinfo.task.set_name(f'{coro.__qualname__} [{id(coro)}]') def done(task): self._background_tasks.remove(taskinfo) try: _log.debug('%s: Background task done: %r', self.label, task.get_name()) # Raise exception from task result = task.result() if done_callback: # task.result() also raises any exception from `coro` done_callback(result) except asyncio.CancelledError: _log.debug('%s: Background task was cancelled: %r', self.label, task.get_name()) except BaseException as e: # Exceptions from `coro` or `done_callback` end up nowhere, so # we print the traceback manually and beat it. import sys, traceback # noqa: E401, isort:skip tb = ''.join(traceback.format_exception(type(e), e, e.__traceback__)) _log.debug('%s: Background task or done_callback croaked: %r', self.label, task.get_name()) _log.debug('%s: %s', self.label, tb) print(tb, file=sys.stderr) sys.exit(99) taskinfo.task.add_done_callback(done) return taskinfo.task
[docs] async def wait_for_background_tasks(self): """ Wait for all tasks created by :meth:`create_background_task` Detached tasks are cancelled first. Exceptions from tasks are raised here, except for :class:`asyncio.CancelledError` from a detached task. """ for taskinfo in self._background_tasks: if taskinfo.detached: _log.debug('%s: Cancelling background task: %r', self.label, taskinfo.task.get_name()) taskinfo.task.cancel() # Tasks are removed from self._background_tasks when they are done, # i.e. self._background_tasks changes size while we're iterating over # it. Wrapping it in a tuple should avoid any related issues. for taskinfo in tuple(self._background_tasks): _log.debug('%s: Waiting for background task: %r', self.label, taskinfo.task.get_name()) try: await taskinfo.task except asyncio.CancelledError: if not taskinfo.detached: raise else: _log.debug('%s: Ignoring cancelled background task: %r', self.label, taskinfo.task.get_name())
monitor_interval = 0.1 """Seconds between requests when waiting for an RPC call to take effect""" common_rpc_error_map = {} """ Mapping of regular expressions to exceptions for all :meth:`call` calls See :meth:`aiobtclientrpc.RPCError.translate`. """
[docs] async def call(self, *args, **kwargs): """ Wrapper around :meth:`aiobtclientrpc.RPCBase.call` that handles exceptions This is a thin wrapper that translates the following exceptions into :class:`~.errors.ConnectionError`: * :class:`aiobtclientrpc.ConnectionError` * :class:`aiobtclientrpc.TimeoutError` * :class:`aiobtclientrpc.AuthenticationError` It also converts any common errors by passing :attr:`common_rpc_error_map` to :meth:`aiobtclientrpc.RPCError.translate`. """ try: return await super().call(*args, **kwargs) except aiobtclientrpc.AuthenticationError as e: raise errors.AuthenticationError(f'{e}: {self.url}') except aiobtclientrpc.TimeoutError as e: raise errors.TimeoutError(f'{e}: {self.url}') except aiobtclientrpc.ConnectionError as e: raise errors.ConnectionError(f'{e}: {self.url}') except aiobtclientrpc.RPCError as e: raise e.translate(self.common_rpc_error_map)
[docs] async def connect(self, *args, **kwargs): """ Wrapper around :meth:`aiobtclientrpc.RPCBase.connect` that handles exceptions See :meth:`call`. """ try: return await super().connect(*args, **kwargs) except aiobtclientrpc.AuthenticationError as e: raise errors.AuthenticationError(f'{e}: {self.url}') except aiobtclientrpc.TimeoutError as e: raise errors.TimeoutError(f'{e}: {self.url}') except aiobtclientrpc.ConnectionError as e: raise errors.ConnectionError(f'{e}: {self.url}')
[docs] async def disconnect(self, *args, **kwargs): """ Wrapper around :meth:`aiobtclientrpc.RPCBase.disconnect` that handles exceptions See :meth:`call`. """ try: return await super().disconnect(*args, **kwargs) except aiobtclientrpc.AuthenticationError as e: raise errors.AuthenticationError(f'{e}: {self.url}') except aiobtclientrpc.TimeoutError as e: raise errors.TimeoutError(f'{e}: {self.url}') except aiobtclientrpc.ConnectionError as e: raise errors.ConnectionError(f'{e}: {self.url}')
@staticmethod def _normalize_infohash(infohash): # Concrete clients may need special infohash values (e.g. all upper case) return utils.Infohash(infohash)
[docs] async def get_infohashes(self): """Return sequence of all known infohashes""" return [ self._normalize_infohash(infohash) for infohash in await self._get_infohashes() ]
@abc.abstractmethod async def _get_infohashes(self): pass @abc.abstractmethod async def _get_torrent_fields(self, infohash, *fields): pass async def _get_torrent_field(self, infohash, field): """ Convenience wrapper around :meth:`_get_torrent_fields` for getting a single field """ infohash = self._normalize_infohash(infohash) fields = await self._get_torrent_fields(infohash, field) return fields[field]
[docs] async def add(self, torrent, *, location=None, stopped=False, verify=True): """ Add torrent to client :param torrent: Path or URL to torrent file, ``magnet:`` URI or infohash :param str location: Download directory or `None` to use the default This should be an absolute path. If it isn't, it is made absolute based on the current working directory, which may be surprising or even non-sensical if the client is running in a different environment. :param bool stopped: Whether the torrent is active right away :param bool verify: Whether any existing files from the torrent are hashed by the client to make sure they are not corrupt :return: :class:`~.Infohash` of the added torrent :raise errors.AddTorrentError: if adding fails """ try: infohash = await self._add( torrent=torrent, location=str(os.path.abspath(location)) if location else None, stopped=stopped, verify=verify, ) except errors.Error as e: raise errors.AddTorrentError(e) else: return utils.Infohash(infohash)
@abc.abstractmethod async def _add(self, torrent, **kwargs): pass
[docs] async def start(self, infohash): """ Start torrent :param infohashes: Infohash of the torrent to start :return: `None` when the torrent was started :raise errors.StartTorrentError: if starting the torrent failed """ try: await self._start( self._normalize_infohash(infohash) ) except errors.Error as e: raise errors.StartTorrentError(e)
@abc.abstractmethod async def _start(self, infohash): pass
[docs] async def stop(self, infohash): """ Stop torrent :param infohash: Infohash of the torrent to stop :return: `None` when the torrent was stopped :raise errors.StopTorrentError: if stopping the torrent failed """ try: await self._stop( self._normalize_infohash(infohash) ) except errors.Error as e: raise errors.StopTorrentError(e)
@abc.abstractmethod async def _stop(self, infohash, **kwargs): pass
[docs] async def verify(self, infohash): """ Initiate hash check of a torrent's files See also :meth:`verify_wait`. :param infohash: Infohash of the torrent to check :return: `None` when the verification was initiated :raise errors.VerifyTorrentError: if initiating the verification failed """ try: await self._verify( self._normalize_infohash(infohash) ) except errors.Error as e: raise errors.VerifyTorrentError(e)
async def _verify(self, infohash): is_verifying = await self._torrent_is_verifying(infohash) if is_verifying: raise errors.TorrentAlreadyVerifying(infohash) else: await self._start_verifying(infohash) # Wait for command to take effect try: await utils.Monitor( call=utils.partial(self._torrent_is_verifying, infohash), interval=0.1, timeout=1.0, ).return_value_equals(True) except errors.TimeoutError: # Verifying may never start because all files are missing or # because the torrent is very small and verification was # finished before Monitor's first call. _log.debug('Verification finished immediately: %r', infohash)
[docs] async def verify_wait(self, *infohashes, interval=(0.3, 3)): """ Asynchronous generator that yields ``(infohash, progress)`` tuples ``progress`` is either a number from ``0.0`` to ``100.0`` or :class:`~.errors.Error` or :class:`~.errors.Warning`. Every infohash is yielded at least once. :param infohashes: Infohashes of the torrents :param interval: Delay between progress updates ``seconds`` Always use the same delay. ``(seconds_min, seconds_max)`` Dynamically change the delay based on how much time is left. ``seconds_min`` is used until ``progress`` gets close to ``100.0``. Then, ``seconds_min`` gradually moves to ``seconds_max``. This allows to spread out update requests until the end, where you want to check progress more frequently to know when the verification is finished. """ if isinstance(interval, (int, float)): interval_min = interval_max = interval elif isinstance(interval, tuple) and len(interval) == 2: interval_min, interval_max = interval else: raise TypeError(f'Invalid interval: {interval!r}') waiters = [ self._verify_wait( self._normalize_infohash(infohash), interval_min, interval_max, ) for infohash in infohashes ] async for coro in utils.merge_async_generators(*waiters): infohash, progress_or_exception = await coro yield (infohash, progress_or_exception)
async def _verify_wait(self, infohash, interval_min, interval_max): assert isinstance(infohash, utils.Infohash) interval = utils.DynamicInterval( name=infohash, min=interval_min, max=interval_max, progress_getter=utils.partial(self._get_verifying_progress, infohash), ) try: # Initial progress yield (infohash, await self._get_verifying_progress(infohash)) progress = 0 while await self._torrent_is_verifying(infohash): if interval.progress != progress: yield (infohash, interval.progress) progress = interval.progress await interval.sleep() # Final progress yield (infohash, await self._get_verifying_progress(infohash)) except (errors.Error, errors.Warning) as e: yield (infohash, e) @abc.abstractmethod async def _torrent_is_verifying(self, infohash): """`True` if torrent is verifying or queued for verification""" @abc.abstractmethod async def _start_verifying(self, infohash): """Initiate hash check""" @abc.abstractmethod async def _get_verifying_progress(self, infohash): """ Verifying progress in percent (0 to 100) After verification is done (:meth:`_torrent_is_verifying` returns `False`), return the combined download progress of the wanted files in percent. """