"""
API for Deluge
"""
import base64
import os
import re
import aiobtclientrpc
from ... import errors, utils
from .. import base
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]class DelugeAPI(base.APIBase, aiobtclientrpc.DelugeRPC):
"""
Deluge API
"""
common_rpc_error_map = {
rf"^'?({utils.find_infohash_regex.pattern})'?$": (errors.NoSuchTorrentError, r'\1'),
}
async def _get_infohashes(self):
return await self.call('core.get_session_state')
def _normalize_infohash(self, infohash):
# Deluge doesn't seem to understand uppercase infohashes
return super()._normalize_infohash(infohash.lower())
async def _get_torrent_fields(self, infohash, *fields):
response = await self.call('core.get_torrent_status', infohash, keys=fields)
if response:
# Return only requested fields
try:
return {f: response[f] for f in fields}
except KeyError as e:
field = e.args[0]
raise errors.ValueError(f'Unknown field: {field!r}')
else:
# Deluge returns an empty `dict` if:
# 1. No torrent with `infohash` exists
# 2. None of the `fields` are valid
infohashes = await self.get_infohashes()
if infohash not in infohashes:
raise errors.NoSuchTorrentError(infohash)
else:
raise RuntimeError(f'Unexpected response: {response!r}')
async def _make_add_args(self, *, torrent, location, stopped, verify):
rpc_args = {
'options': utils.without_None_values({
'add_paused': bool(stopped),
'seed_mode': not bool(verify),
'download_location': location,
}),
}
if utils.is_magnet(torrent):
rpc_function = 'core.add_torrent_magnet'
rpc_args['uri'] = str(torrent)
elif utils.is_infohash(torrent):
rpc_function = 'core.add_torrent_magnet'
rpc_args['uri'] = f'magnet:?xt=urn:btih:{torrent}'
else:
if utils.is_url(torrent):
torrent_bytes = await utils.torrent.download_bytes(torrent)
else:
# Assume `torrent` is local file
torrent_bytes = utils.torrent.read_bytes(torrent)
rpc_args['filedump'] = str(
base64.b64encode(torrent_bytes),
encoding='ascii',
)
rpc_args['filename'] = os.path.basename(torrent)
rpc_function = 'core.add_torrent_file'
return rpc_function, rpc_args
async def _add(self, torrent, *, location, stopped, verify):
rpc_function, rpc_args = await self._make_add_args(
torrent=torrent,
location=location,
stopped=stopped,
verify=verify,
)
try:
infohash = await self.call(rpc_function, **rpc_args)
return infohash
except aiobtclientrpc.RPCError as e:
dupe_regex = re.compile(r'Torrent already (?:in session|being added) \(([0-9a-zA-Z]+)\)')
match = dupe_regex.search(str(e))
if match:
infohash = match.group(1)
raise errors.TorrentAlreadyAdded(infohash, torrent)
else:
raise e.translate({
# Invalid torrent
r'decoding filedump failed': errors.InvalidTorrentError(torrent),
# Invalid magnet URI
r'invalid magnet info': errors.InvalidTorrentError(torrent),
r'non-hexadecimal number found': errors.InvalidTorrentError(torrent),
})
_timeout_start = 10.0
async def _start(self, infohash):
# Check current state
state = await self._get_torrent_field(infohash, 'state')
if state == 'Error':
raise errors.Error('Cannot start torrent in error state')
elif state != 'Paused':
raise errors.TorrentAlreadyStarted(infohash)
else:
await self.call('core.resume_torrent', infohash)
# Wait for command to take effect
await utils.Monitor(
call=utils.partial(self._get_torrent_field, infohash, 'state'),
interval=self.monitor_interval,
timeout=self._timeout_start,
).return_value_equals('Paused', negate=True)
_timeout_stop = 10.0
async def _stop(self, infohash):
# Check current state
state = await self._get_torrent_field(infohash, 'state')
if state == 'Error':
raise errors.Error('Cannot stop torrent in error state')
elif state == 'Paused':
raise errors.TorrentAlreadyStopped(infohash)
else:
await self.call('core.pause_torrent', infohash)
# Wait for command to take effect
await utils.Monitor(
call=utils.partial(self._get_torrent_field, infohash, 'state'),
interval=self.monitor_interval,
timeout=self._timeout_stop,
).return_value_equals('Paused')
async def _start_verifying(self, infohash):
await self.call('core.force_recheck', [infohash])
async def _torrent_is_verifying(self, infohash):
state = await self._get_torrent_field(infohash, 'state')
return state in ('Checking', 'Queued')
async def _get_verifying_progress(self, infohash):
progress = await self._get_torrent_field(infohash, 'progress')
return progress