Source code for aiobtclientapi.utils.monitor

import asyncio
import time

import async_timeout

from .. import errors

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs]class Monitor: """ Continuously call awaitable until it returns a predefined value :param call: Any awaitable that doesn't take any arguments :param attribute: Attribute from the return value of `call` to act on, e.g. if `call` returns a :class:`~.response.Response` object and you want to monitor its ``success`` attribute :param interval: Seconds to sleep before calling `call` again :type interval: :class:`DynamicInterval` instance or :class:`int` :param timeout: Maximum number of seconds overall before :class:`~.errors.TimeoutError` is raised """ def __init__(self, call, attribute=None, interval=1, timeout=None): self._call = call self._attribute = attribute self._interval = interval self._timeout = timeout
[docs] async def return_value_equals(self, value, negate=False): """ Block until ``await call() == value`` :param negate: Invert the result of the comparison """ def condition(return_value): _log.debug('Monitoring: %r %s %r', value, '!=' if negate else '==', return_value) return (value == return_value) is (not negate) return await self._block_until(condition)
[docs] async def return_value_is(self, value, negate=False): """ Block until ``await call() is value`` :param negate: Invert the result of the comparison """ def condition(return_value): _log.debug('Monitoring: %r is%s %r', value, ' not' if negate else '', return_value) return (value is return_value) is (not negate) return await self._block_until(condition)
[docs] async def return_value_contains(self, value, negate=False): """ Block until ``value in (await call())`` :param negate: Invert the result of the comparison """ def condition(return_value): _log.debug('Monitoring: %r %sin %r', value, 'not ' if negate else '', return_value) return (value in return_value) is (not negate) return await self._block_until(condition)
[docs] async def return_value_validates(self, validator, negate=False): """ Block until ``validator(await call())`` is truthy :param negate: Invert the result of the comparison """ def condition(return_value): _log.debug('Monitoring: %s%s(%r)', 'not ' if negate else '', validator.__qualname__, return_value) return bool(validator(return_value)) is (not negate) return await self._block_until(condition)
async def _block_until(self, condition): try: async with async_timeout.timeout(self._timeout): while True: return_value = await self._call() if self._attribute: return_value = getattr(return_value, self._attribute) if condition(return_value): return return_value else: await self._sleep() except asyncio.TimeoutError: raise errors.TimeoutError(f'Timeout after {self._timeout} seconds') async def _sleep(self): if isinstance(self._interval, (int, float)): await asyncio.sleep(self._interval) elif isinstance(self._interval, DynamicInterval): interval = await self._interval.next() await asyncio.sleep(interval) else: raise RuntimeError(f'Invalid interval: {self._interval!r}')
[docs]class DynamicInterval: """ Generate intervals from `min` to `max` depending on some ongoing operation :param min: Minimum interval :param max: Maximum interval :param progress_getter: Callable that takes no arguments and returns a number from 0 to 100 As progress approaches 100, the interval returned by :meth:`next` gets closer to `min`. :param name: Any object with a descriptive string representation (only used for debugging) """ def __init__(self, min, max, progress_getter, name=None): self._min = min self._max = max self._progress_getter = progress_getter self._progress = 0 self._samples = [] self._name = name or id(self) @property def progress(self): """Most recent return value from `progress_getter`""" return self._progress
[docs] async def sleep(self): """:func:`~.asyncio.sleep` if :meth:`next` returns positive interval""" interval = await self.next() if interval > 0: await asyncio.sleep(interval)
[docs] async def next(self): """Return next delay""" await self._maintain_samples() seconds_left = await self._get_seconds_left() _log.debug('%s: SECONDS LEFT: %r', self._name, seconds_left) # `threshold_max` is the `seconds_left` value where we start moving from # `self._max` to `self._min`. threshold_max = self._max * 3 # `threshold_min` is the `seconds_left` value where interval is # always `self._min`. threshold_min = self._min * 3 if seconds_left > threshold_max: interval = self._max else: # `factor` is a number between 1.0 and 0.0 that indicates how # close we are to `threshold_min`. factor = max(0, seconds_left - threshold_min) / (threshold_max - threshold_min) # Maintain interval between `self._max` and `self._min` # according to `factor`. interval = (self._max * factor) + (self._min * (1 - factor)) _log.debug('%s: DYNAMIC INTERVAL: %r', self._name, interval) return interval
async def _get_seconds_left(self): progress_diff = self._samples[-1][1] - self._samples[0][1] if progress_diff <= 0: # Not enough samples to estimate return -1 else: seconds_diff = self._samples[-1][0] - self._samples[0][0] progress_per_second = progress_diff / seconds_diff progress_left = 100 - self._samples[-1][1] seconds_left = progress_left / progress_per_second return seconds_left async def _maintain_samples(self): self._progress = progress = await self._progress_getter() assert 0 <= progress <= 100, progress self._samples.append((time.monotonic(), progress)) # Only keep the 3 newest samples del self._samples[:-3]