"""
API for qBittorrent
"""
import os
import aiobtclientrpc
from ... import errors, utils
from .. import base
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]class QbittorrentAPI(base.APIBase, aiobtclientrpc.QbittorrentRPC):
"""
qBittorrent API
"""
async def _get_infohashes(self):
torrents = await self.call('torrents/info')
return (t['hash'] for t in torrents)
async def _get_torrent_fields(self, infohash, *fields):
torrents = await self.call('torrents/info', hashes=[infohash])
if torrents and len(torrents) == 1:
torrent = torrents[0]
# Only return wanted information
try:
return {field: torrent[field] for field in fields}
except KeyError as e:
field = e.args[0]
raise errors.ValueError(f'Unknown field: {field!r}')
else:
# qBittorrent returns all torrents if it can't find `infohash`
known_infohashes = await self.get_infohashes()
if infohash not in known_infohashes:
raise errors.NoSuchTorrentError(infohash)
else:
raise RuntimeError(f'Unexpected response: {torrents!r}')
async def _make_add_torrent_args(self, *, torrent, location, stopped, verify):
rpc_args = utils.without_None_values({
'paused': 'true' if stopped else 'false',
'skip_checking': 'false' if verify else 'true',
'savepath': location,
})
if utils.is_magnet(torrent):
rpc_args['urls'] = [str(torrent)]
infohash = utils.torrent.get_infohash(torrent)
elif utils.is_infohash(torrent):
rpc_args['urls'] = [f'magnet:?xt=urn:btih:{torrent}']
infohash = utils.torrent.get_infohash(torrent)
else:
if utils.is_url(torrent):
torrent_bytes = await utils.torrent.download_bytes(torrent)
else:
# Assume `torrent` is local file
torrent_bytes = utils.torrent.read_bytes(torrent)
infohash = utils.torrent.get_infohash(torrent_bytes)
rpc_args['files'] = (
('filename', (
os.path.basename(torrent), # File name
torrent_bytes, # File content
'application/x-bittorrent', # MIME type
)),
)
return rpc_args, infohash
_timeout_add_torrent = 30.0
async def _add_torrent(self, torrent, *, location, stopped, verify):
rpc_args, infohash = await self._make_add_torrent_args(
torrent=torrent,
location=location,
stopped=stopped,
verify=verify,
)
try:
response = await self.call('torrents/add', **rpc_args)
except aiobtclientrpc.RPCError as e:
raise e.translate({
r'not a valid torrent': errors.InvalidTorrentError(torrent),
})
else:
if response == 'Fails.':
# qBittorrent doesn't report duplicate torrents, it just "Fails."
# with HTTP status "200 OK"
if infohash in await self.get_infohashes():
yield ('already_added', infohash)
yield errors.TorrentAlreadyAdded(infohash, name=torrent)
else:
raise errors.InvalidTorrentError(torrent)
else:
# Wait for command to take effect
await utils.Monitor(
call=self.get_infohashes,
interval=self.monitor_interval,
timeout=self._timeout_add_torrent,
).return_value_contains(infohash)
yield ('added', infohash)
_timeout_start_torrent = 10.0
async def _start_torrent(self, infohash):
# Check current state
state = await self._get_torrent_field(infohash, 'state')
if not state.startswith('paused'):
yield ('already_started', infohash)
yield errors.TorrentAlreadyStarted(infohash)
else:
await self.call('torrents/resume', hashes=[infohash])
# Wait for command to take effect
await utils.Monitor(
call=utils.partial(self._get_torrent_field, infohash, 'state'),
interval=self.monitor_interval,
timeout=self._timeout_start_torrent,
).return_value_contains('paused', negate=True)
yield ('started', infohash)
_timeout_stop_torrent = 10.0
async def _stop_torrent(self, infohash):
# Check current state
state = await self._get_torrent_field(infohash, 'state')
if state.startswith('paused'):
yield ('already_stopped', infohash)
yield errors.TorrentAlreadyStopped(infohash)
else:
await self.call('torrents/pause', hashes=[infohash])
# Wait for command to take effect
await utils.Monitor(
call=utils.partial(self._get_torrent_field, infohash, 'state'),
interval=self.monitor_interval,
timeout=self._timeout_stop_torrent,
).return_value_contains('paused')
yield ('stopped', infohash)
async def _start_verifying(self, infohash):
await self.call('torrents/recheck', hashes=[infohash])
async def _torrent_is_verifying(self, infohash):
state = await self._get_torrent_field(infohash, 'state')
return state.startswith('checking')
async def _get_verifying_progress(self, infohash):
progress = await self._get_torrent_field(infohash, 'progress')
return progress * 100