"""
API for the Transmission daemon
"""
import base64
import aiobtclientrpc
from ... import errors, utils
from .. import base
from . import enums
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]class TransmissionAPI(base.APIBase, aiobtclientrpc.TransmissionRPC):
"""
Transmission daemon API
"""
async def _get_infohashes(self):
response = await self.call('torrent-get', fields=['hashString'])
return (t['hashString'] for t in response['arguments']['torrents'])
async def _get_torrent_fields(self, infohash, *fields):
response = await self.call('torrent-get', ids=[infohash], fields=fields)
torrents = response['arguments']['torrents']
if len(torrents) == 0:
raise errors.NoSuchTorrentError(infohash)
elif len(torrents) == 1:
torrent = torrents[0]
try:
return {field: torrent[field] for field in fields}
except KeyError as e:
field = e.args[0]
raise errors.ValueError(f'Unknown field: {field!r}')
else:
raise RuntimeError(f'Unexpected torrent list: {torrents!r}')
async def _make_add_args(self, *, torrent, location, stopped, verify):
rpc_args = utils.without_None_values({
'paused': bool(stopped),
'download-dir': location,
})
if utils.is_magnet(torrent):
rpc_args['filename'] = str(torrent)
elif utils.is_infohash(torrent):
rpc_args['filename'] = f'magnet:?xt=urn:btih:{torrent}'
else:
if utils.is_url(torrent):
torrent_bytes = await utils.torrent.download_bytes(torrent)
else:
# Assume `torrent` is local file
torrent_bytes = utils.torrent.read_bytes(torrent)
rpc_args['metainfo'] = str(
base64.b64encode(torrent_bytes),
encoding='ascii',
)
return rpc_args
async def _add(self, torrent, *, location, stopped, verify):
rpc_args = await self._make_add_args(
torrent=torrent,
location=location,
stopped=stopped,
verify=verify,
)
try:
result = await self.call('torrent-add', rpc_args)
except aiobtclientrpc.RPCError as e:
# Transmission 4.* sends
# {"result": "duplicate torrent", "arguments": {"torrent-duplicate": ...}}
# This is fixed in 4.0.3: https://github.com/transmission/transmission/pull/5370
if e.info and 'torrent-duplicate' in e.info:
infohash = e.info['torrent-duplicate']['hashString']
raise errors.TorrentAlreadyAdded(infohash, name=torrent)
else:
raise e.translate({
# Transmission 4.*
r'Unrecognized info': errors.InvalidTorrentError(torrent),
# Transmission 3.*
r'Invalid or corrupt': errors.InvalidTorrentError(torrent),
})
else:
arguments = result['arguments']
# Transmission 3.* sends
# {"result": "success", "arguments": {"torrent-duplicate": ...}}
if 'torrent-duplicate' in arguments:
infohash = arguments['torrent-duplicate']['hashString']
raise errors.TorrentAlreadyAdded(infohash, name=torrent)
elif 'torrent-added' in arguments:
# TODO: Verify torrent if `verify` is truthy
infohash = arguments['torrent-added']['hashString']
return infohash
else:
raise RuntimeError(f'Unexpected response: {result}')
_timeout_start = 10.0
async def _start(self, infohash):
# Check current state
status = await self._get_torrent_field(infohash, 'status')
if status != enums.TR_STATUS.STOPPED:
raise errors.TorrentAlreadyStarted(infohash)
else:
await self.call('torrent-start', ids=[infohash])
# Wait for command to take effect
await utils.Monitor(
call=utils.partial(self._get_torrent_field, infohash, 'status'),
interval=self.monitor_interval,
timeout=self._timeout_start,
).return_value_equals(enums.TR_STATUS.STOPPED, negate=True)
_timeout_stop = 10.0
async def _stop(self, infohash):
# Check current state
status = await self._get_torrent_field(infohash, 'status')
if status == enums.TR_STATUS.STOPPED:
raise errors.TorrentAlreadyStopped(infohash)
else:
await self.call('torrent-stop', ids=[infohash])
# Wait for command to take effect
await utils.Monitor(
call=utils.partial(self._get_torrent_field, infohash, 'status'),
interval=self.monitor_interval,
timeout=self._timeout_stop,
).return_value_equals(enums.TR_STATUS.STOPPED)
async def _start_verifying(self, infohash):
await self.call('torrent-verify', ids=[infohash])
async def _torrent_is_verifying(self, infohash):
status = await self._get_torrent_field(infohash, 'status')
return status in (enums.TR_STATUS.CHECK, enums.TR_STATUS.CHECK_WAIT)
async def _get_verifying_progress(self, infohash):
torrent = await self._get_torrent_fields(infohash, 'status', 'recheckProgress', 'percentDone')
if torrent['status'] in (enums.TR_STATUS.CHECK, enums.TR_STATUS.CHECK_WAIT):
return torrent['recheckProgress'] * 100
else:
# NOTE: Despite the name, "percentDone" is also a fraction from 0.0
# to 1.0.
return torrent['percentDone'] * 100