Commit a5a271b8 authored by echel0n's avatar echel0n
Browse files

renamed newznab `key` param to `api_key`

removed newznab `key` column from search providers newznab database table
refactored AMQP service
fixed issue with Synology download station for DSM7 and auth
added `subliminal.refiners.tvdb` to list of sentry ignored loggers
moved `check_web_socket_queue` function to websocket module
fixed issue with NZBget and downloading
parent cd034f33
......@@ -20,13 +20,10 @@
# ##############################################################################
import os
import re
from urllib.parse import urljoin
from requests import RequestException
import sickrage
from sickrage.clients import TorrentClient
......@@ -37,8 +34,9 @@ class DownloadStationAPI(TorrentClient):
super(DownloadStationAPI, self).__init__('DownloadStation', host, username, password)
self.urls = {
'auth': urljoin(self.host, 'webapi/auth.cgi'),
'task': urljoin(self.host, 'webapi/DownloadStation/task.cgi'),
'auth': urljoin(self.host, '/webapi/auth.cgi'),
'query': urljoin(self.host, '/webapi/query.cgi'),
'task': urljoin(self.host, '/webapi/DownloadStation/task.cgi'),
'info': urljoin(self.host, '/webapi/DownloadStation/info.cgi'),
}
......@@ -49,7 +47,6 @@ class DownloadStationAPI(TorrentClient):
self.post_task = {
'method': 'create',
'version': '1',
'api': 'SYNO.DownloadStation.Task',
'session': 'DownloadStation',
}
......@@ -115,7 +112,6 @@ class DownloadStationAPI(TorrentClient):
params = {
'api': 'SYNO.API.Auth',
'version': 2,
'method': 'login',
'account': self.username,
'passwd': self.password,
......@@ -123,6 +119,9 @@ class DownloadStationAPI(TorrentClient):
'format': 'cookie'
}
api_info = self._get_api_info(params['api'])
params['version'] = api_info.get('maxVersion')
self.response = self.session.get(self.urls['auth'], params=params, verify=bool(sickrage.app.config.torrent.verify_cert))
if not self.response:
self.session.cookies.clear()
......@@ -134,14 +133,44 @@ class DownloadStationAPI(TorrentClient):
return self._check_response()
def _get_api_info(self, method):
json_data = {}
params = {
'api': 'SYNO.API.Info',
'version': 1,
'method': 'query',
'query': method
}
resp = self.session.get(self.urls['query'], params=params, verify=bool(sickrage.app.config.torrent.verify_cert))
if not resp:
return json_data
try:
json_resp = resp.json()
except (ValueError, AttributeError):
return json_data
else:
success = json_resp.get('success')
if success:
json_data = json_resp.get('data')
return json_data.get(method, {})
def _add_torrent_uri(self, result):
data = self.post_task
api_info = self._get_api_info(data['api'])
data['version'] = api_info.get('maxVersion')
data['uri'] = result.url
return self._send_dsm_request(method='post', data=data)
def _add_torrent_file(self, result):
data = self.post_task
api_info = self._get_api_info(data['api'])
data['version'] = api_info.get('maxVersion')
files = {'file': ('{}.torrent'.format(result.name), result.content)}
return self._send_dsm_request(method='post', data=data, files=files)
......@@ -158,11 +187,13 @@ class DownloadStationAPI(TorrentClient):
params = {
'api': 'SYNO.DownloadStation.Info',
'version': 2,
'method': 'getinfo',
'session': 'DownloadStation',
}
api_info = self._get_api_info(params['api'])
params['version'] = api_info.get('maxVersion')
self.response = self.session.get(self.urls['info'], params=params, verify=False, timeout=120)
if not self.response or not self.response.content:
self.session.cookies.clear()
......@@ -185,8 +216,7 @@ class DownloadStationAPI(TorrentClient):
# lets make sure the default is relative,
# or forcefully set the location setting
params.update({
'method': 'getconfig',
'version': 2,
'method': 'getconfig'
})
self.response = self.session.get(self.urls['info'], params=params, verify=False, timeout=120)
......@@ -219,4 +249,3 @@ class DownloadStationAPI(TorrentClient):
self._request(method=method, data=data, **kwargs)
return self._check_response()
......@@ -45,7 +45,7 @@ from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger
from tornado.ioloop import IOLoop, PeriodicCallback
import sickrage
from sickrage.core.amqp import AMQPClient
from sickrage.core.amqp.consumer import AMQPConsumer
from sickrage.core.announcements import Announcements
from sickrage.core.api import API
from sickrage.core.auth import AuthServer
......@@ -80,6 +80,7 @@ from sickrage.core.updaters.tz_updater import TimeZoneUpdater
from sickrage.core.upnp import UPNPClient
from sickrage.core.version_updater import VersionUpdater, SourceUpdateManager
from sickrage.core.webserver import WebServer
from sickrage.core.websocket import check_web_socket_queue
from sickrage.metadata_providers import MetadataProviders
from sickrage.notification_providers import NotificationProviders
from sickrage.search_providers import SearchProviders
......@@ -230,7 +231,7 @@ class Core(object):
self.auth_server = None
self.announcements = None
self.api = None
self.amqp_client = None
self.amqp_consumer = None
def start(self):
self.started = True
......@@ -272,7 +273,7 @@ class Core(object):
self.auto_postprocessor = AutoPostProcessor()
self.upnp_client = UPNPClient()
self.announcements = Announcements()
self.amqp_client = AMQPClient()
self.amqp_consumer = AMQPConsumer()
# authorization sso client
self.auth_server = AuthServer()
......@@ -559,6 +560,9 @@ class Core(object):
# launch browser
IOLoop.current().add_callback(self.launch_browser)
# watch websocket message queue
PeriodicCallback(check_web_socket_queue, 100).start()
# perform server checkups every hour
PeriodicCallback(self.server_checkup, 1 * 60 * 60 * 1000).start()
......@@ -606,6 +610,7 @@ class Core(object):
'enzyme.parsers.ebml.core',
'subliminal.core',
'subliminal.utils',
'subliminal.refiners.tvdb',
'subliminal.refiners.metadata',
'subliminal.providers.tvsubtitles',
'pika.connection',
......@@ -711,7 +716,7 @@ class Core(object):
self.postprocessor_queue.shutdown()
# stop amqp consumer
self.amqp_client.stop()
self.amqp_consumer.stop()
# log out of ADBA
if self.adba_connection:
......
......@@ -22,21 +22,15 @@ import ssl
from ssl import SSLCertVerificationError
import pika
from google.protobuf.json_format import MessageToDict
from pika.adapters.tornado_connection import TornadoConnection
from pika.adapters.utils.connection_workflow import AMQPConnectorException
from pika.exceptions import StreamLostError, AMQPConnectionError
from tornado.ioloop import IOLoop
import sickrage
from sickrage.protos.announcement_v1_pb2 import CreatedAnnouncementResponse, DeletedAnnouncementResponse
from sickrage.protos.network_timezone_v1_pb2 import SavedNetworkTimezoneResponse, DeletedNetworkTimezoneResponse
from sickrage.protos.search_provider_url_v1_pb2 import SavedSearchProviderUrlResponse
from sickrage.protos.server_certificate_v1_pb2 import SavedServerCertificateResponse
from sickrage.protos.updates_v1_pb2 import UpdatedAppResponse
class AMQPClient(object):
class AMQPBase(object):
def __init__(self):
self._name = 'AMQP'
self._amqp_host = 'rmq.sickrage.ca'
......@@ -50,39 +44,6 @@ class AMQPClient(object):
IOLoop.current().add_callback(self.connect)
@property
def events(self):
return {
'server_ssl_certificate.saved': {
'event_type': SavedServerCertificateResponse(),
'event_cmd': sickrage.app.wserver.load_ssl_certificate,
},
'network_timezone.saved': {
'event_type': SavedNetworkTimezoneResponse(),
'event_cmd': sickrage.app.tz_updater.update_network_timezone,
},
'network_timezone.deleted': {
'event_type': DeletedNetworkTimezoneResponse(),
'event_cmd': sickrage.app.tz_updater.delete_network_timezone,
},
'search_provider_url.saved': {
'event_type': SavedSearchProviderUrlResponse(),
'event_cmd': sickrage.app.search_providers.update_url,
},
'app.updated': {
'event_type': UpdatedAppResponse(),
'event_cmd': sickrage.app.version_updater.task,
},
'announcement.created': {
'event_type': CreatedAnnouncementResponse(),
'event_cmd': sickrage.app.announcements.add,
},
'announcement.deleted': {
'event_type': DeletedAnnouncementResponse(),
'event_cmd': sickrage.app.announcements.clear,
},
}
def connect(self):
if not sickrage.app.api.token or not sickrage.app.config.general.server_id:
IOLoop.current().call_later(5, self.reconnect)
......@@ -155,41 +116,6 @@ class AMQPClient(object):
def on_channel_open(self, channel):
self._channel = channel
self._channel.basic_qos(callback=self.on_qos_applied, prefetch_count=self._prefetch_count)
def on_qos_applied(self, method):
self.start_consuming()
def on_message(self, unused_channel, basic_deliver, properties, body):
try:
if basic_deliver.exchange in self.events:
event = self.events[basic_deliver.exchange]
message = event['event_type']
message.ParseFromString(body)
message_kwargs = MessageToDict(message, including_default_value_fields=True, preserving_proto_field_name=True)
sickrage.app.log.debug(
f"Received AMQP response: {basic_deliver.exchange} :: {message_kwargs!r}"
)
IOLoop.current().spawn_callback(event['event_cmd'], **message_kwargs)
except Exception as e:
sickrage.app.log.debug(f"AMQP exchange: {basic_deliver.exchange} message caused an exception: {e!r}")
finally:
self._channel.basic_ack(basic_deliver.delivery_tag)
def start_consuming(self):
sickrage.app.log.info('Connected to SiCKRAGE AMQP server')
try:
self._consumer_tag = self._channel.basic_consume(
on_message_callback=self.on_message,
queue=f'{sickrage.app.config.user.sub_id}.{sickrage.app.config.general.server_id}',
)
except Exception as e:
sickrage.app.log.debug(f'Exception happened during consuming AMQP messages: {e!r}')
IOLoop.current().call_later(5, self.reconnect)
def stop(self):
self._closing = True
......
# ##############################################################################
# Author: echel0n <[email protected]>
# URL: https://sickrage.ca/
# Git: https://git.sickrage.ca/SiCKRAGE/sickrage.git
# -
# This file is part of SiCKRAGE.
# -
# SiCKRAGE is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# -
# SiCKRAGE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# -
# You should have received a copy of the GNU General Public License
# along with SiCKRAGE. If not, see <http://www.gnu.org/licenses/>.
# ##############################################################################
from google.protobuf.json_format import MessageToDict
from tornado.ioloop import IOLoop
import sickrage
from sickrage.core.amqp import AMQPBase
from sickrage.protos.announcement_v1_pb2 import CreatedAnnouncementResponse, DeletedAnnouncementResponse
from sickrage.protos.network_timezone_v1_pb2 import SavedNetworkTimezoneResponse, DeletedNetworkTimezoneResponse
from sickrage.protos.search_provider_url_v1_pb2 import SavedSearchProviderUrlResponse
from sickrage.protos.server_certificate_v1_pb2 import SavedServerCertificateResponse
from sickrage.protos.updates_v1_pb2 import UpdatedAppResponse
class AMQPConsumer(AMQPBase):
def __init__(self):
super(AMQPConsumer, self).__init__()
@property
def events(self):
return {
'server_ssl_certificate.saved': {
'event_msg': SavedServerCertificateResponse(),
'event_cmd': sickrage.app.wserver.load_ssl_certificate,
},
'network_timezone.saved': {
'event_msg': SavedNetworkTimezoneResponse(),
'event_cmd': sickrage.app.tz_updater.update_network_timezone,
},
'network_timezone.deleted': {
'event_msg': DeletedNetworkTimezoneResponse(),
'event_cmd': sickrage.app.tz_updater.delete_network_timezone,
},
'search_provider_url.saved': {
'event_msg': SavedSearchProviderUrlResponse(),
'event_cmd': sickrage.app.search_providers.update_url,
},
'app.updated': {
'event_msg': UpdatedAppResponse(),
'event_cmd': sickrage.app.version_updater.task,
},
'announcement.created': {
'event_msg': CreatedAnnouncementResponse(),
'event_cmd': sickrage.app.announcements.add,
},
'announcement.deleted': {
'event_msg': DeletedAnnouncementResponse(),
'event_cmd': sickrage.app.announcements.clear,
},
}
def on_channel_open(self, channel):
self._channel = channel
self._channel.basic_qos(callback=self.on_qos_applied, prefetch_count=self._prefetch_count)
def on_qos_applied(self, method):
self.start_consuming()
def on_message(self, unused_channel, basic_deliver, properties, body):
try:
if basic_deliver.exchange in self.events:
event = self.events[basic_deliver.exchange]
message = event['event_msg']
message.ParseFromString(body)
message_kwargs = MessageToDict(message, including_default_value_fields=True, preserving_proto_field_name=True)
sickrage.app.log.debug(
f"Received AMQP event: {basic_deliver.exchange} :: {message_kwargs!r}"
)
IOLoop.current().spawn_callback(event['event_cmd'], **message_kwargs)
except Exception as e:
sickrage.app.log.debug(f"AMQP exchange: {basic_deliver.exchange} message caused an exception: {e!r}")
finally:
self._channel.basic_ack(basic_deliver.delivery_tag)
def start_consuming(self):
sickrage.app.log.info('Connected to SiCKRAGE AMQP server')
try:
self._consumer_tag = self._channel.basic_consume(
on_message_callback=self.on_message,
queue=f'{sickrage.app.config.user.sub_id}.{sickrage.app.config.general.server_id}',
)
except Exception as e:
sickrage.app.log.debug(f'Exception happened during consuming AMQP messages: {e!r}')
IOLoop.current().call_later(5, self.reconnect)
......@@ -415,7 +415,7 @@ class Config(object):
sickrage.app.search_providers[search_provider.provider_type.name][search_provider.provider_id] = NewznabProvider(**{
'name': search_provider.name,
'url': search_provider.url,
'key': search_provider.key,
'api_key': search_provider.api_key,
'catIDs': search_provider.cat_ids
})
......@@ -437,8 +437,8 @@ class Config(object):
if search_provider.provider_type in [SearchProviderType.TORRENT, SearchProviderType.TORRENT_RSS]:
sickrage.app.search_providers.all()[search_provider.provider_id].ratio = search_provider.ratio
elif search_provider.provider_type in [SearchProviderType.NZB, SearchProviderType.NEWZNAB]:
sickrage.app.search_providers.all()[search_provider.provider_id].api_key = search_provider.api_key
sickrage.app.search_providers.all()[search_provider.provider_id].username = search_provider.username
sickrage.app.search_providers.all()[search_provider.provider_id].api_key = search_provider.api_key
sickrage.app.search_providers.all()[search_provider.provider_id].search_mode = search_provider.search_mode
sickrage.app.search_providers.all()[search_provider.provider_id].search_separator = search_provider.search_separator
......@@ -546,14 +546,13 @@ class Config(object):
search_provider.name = sickrage.app.search_providers.all()[search_provider.provider_id].name
search_provider.url = sickrage.app.search_providers.all()[search_provider.provider_id].urls['base_url']
search_provider.key = sickrage.app.search_providers.all()[search_provider.provider_id].key
search_provider.api_key = sickrage.app.search_providers.all()[search_provider.provider_id].api_key
search_provider.cat_ids = sickrage.app.search_providers.all()[search_provider.provider_id].catIDs
if search_provider:
if search_provider.provider_type in [SearchProviderType.TORRENT, SearchProviderType.TORRENT_RSS]:
search_provider.ratio = sickrage.app.search_providers.all()[search_provider.provider_id].ratio
elif search_provider.provider_type in [SearchProviderType.NZB, SearchProviderType.NEWZNAB]:
search_provider.api_key = sickrage.app.search_providers.all()[search_provider.provider_id].api_key
search_provider.username = sickrage.app.search_providers.all()[search_provider.provider_id].username
search_provider.search_mode = sickrage.app.search_providers.all()[search_provider.provider_id].search_mode
......@@ -1333,6 +1332,7 @@ class Config(object):
elif provider_obj.provider_type in [SearchProviderType.NZB, SearchProviderType.NEWZNAB]:
provider_obj.username = auto_type(provider_settings.get('username', ''))
provider_obj.api_key = auto_type(provider_settings.get('api_key', ''))
provider_obj.api_key = auto_type(provider_settings.get('key', provider_obj.api_key))
custom_settings = {
'minseed': auto_type(provider_settings.get('minseed', 0)),
......
......@@ -708,7 +708,6 @@ class ConfigDB(SRDatabase):
provider_type = Column(Enum(SearchProviderType), default=SearchProviderType.NEWZNAB)
name = Column(Text, default='')
url = Column(Text, default='')
key = Column(Text, default='')
cat_ids = Column(Text, default='')
api_key = Column(CustomStringEncryptedType(Text, key=encryption_key), default='')
username = Column(Text, default='')
......
"""Initial migration
Revision ID: 3
Revises:
Create Date: 2017-12-29 14:39:27.854291
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '3'
down_revision = '2'
def upgrade():
conn = op.get_bind()
meta = sa.MetaData(bind=conn)
search_providers_newznab = sa.Table('search_providers_newznab', meta, autoload=True)
with op.get_context().begin_transaction():
for row in conn.execute(search_providers_newznab.select()):
if row.key:
conn.execute(f'UPDATE search_providers_newznab SET api_key = "{row.key}" WHERE search_providers_newznab.id = {row.id}')
with op.batch_alter_table('search_providers_newznab') as batch_op:
batch_op.drop_column('key')
def downgrade():
pass
......@@ -83,7 +83,7 @@ def snatch_episode(result, end_status=EpisodeStatus.SNATCHED):
elif sickrage.app.config.general.nzb_method == NzbMethod.SABNZBD:
dlResult = SabNZBd.sendNZB(result)
elif sickrage.app.config.general.nzb_method == NzbMethod.NZBGET:
is_proper = True if end_status == EpisodeStatus.NATCHED_PROPER else False
is_proper = True if end_status == EpisodeStatus.SNATCHED_PROPER else False
dlResult = NZBGet.sendNZB(result, is_proper)
elif sickrage.app.config.general.nzb_method == NzbMethod.DOWNLOAD_STATION:
client = get_client_instance(sickrage.app.config.general.nzb_method.value, client_type='nzb')()
......
......@@ -139,9 +139,6 @@ class WebServer(object):
def start(self):
self.started = True
# watch websocket message queue
PeriodicCallback(self.check_web_socket_queue, 100).start()
# load languages
tornado.locale.load_gettext_translations(sickrage.LOCALE_DIR, 'messages')
......@@ -496,11 +493,6 @@ class WebServer(object):
sickrage.app.log.warning(e.strerror)
raise SystemExit
def check_web_socket_queue(self):
if not WebSocketUIHandler.message_queue.empty():
message = WebSocketUIHandler.message_queue.get()
WebSocketUIHandler.broadcast(message)
def load_ssl_certificate(self, certificate=None, private_key=None):
if certificate and private_key:
with open(sickrage.app.https_cert_file, 'w') as cert_out:
......
......@@ -136,7 +136,6 @@ class SaveProvidersHandler(BaseHandler):
elif provider_obj.provider_type in [SearchProviderType.NZB, SearchProviderType.NEWZNAB]:
provider_obj.username = self.get_argument(providerID + '_username', '').strip()
provider_obj.api_key = self.get_argument(providerID + '_api_key', '').strip()
provider_obj.key = self.get_argument(providerID + '_key', '').strip()
custom_settings = {
'minseed': int(self.get_argument(providerID + '_minseed', None) or 0),
......
......@@ -37,7 +37,7 @@
'|'.join([providerID,
providerObj.name,
providerObj.urls["base_url"],
str(providerObj.key),
str(providerObj.api_key),
providerObj.catIDs,
("false", "true")[bool(providerObj.default)],
("false", "true")[bool(sickrage.app.config.general.use_nzbs)]]))
......@@ -189,10 +189,10 @@
<div class="input-group-prepend">
<span class="input-group-text"><span class="fas fa-cloud"></span></span>
</div>
<input id="${providerID}_key"
name="${providerID}_key"
value="${providerObj.key}"
newznab_name="${providerID}_key"
<input id="${providerID}_api_key"
name="${providerID}_api_key"
value="${providerObj.api_key}"
newznab_name="${providerID}_api_key"
class="newznab_key form-control"
title="Provider API key"
autocapitalize="off"/>
......
......@@ -7,6 +7,12 @@ from tornado.websocket import WebSocketHandler
import sickrage
def check_web_socket_queue():
if not WebSocketUIHandler.message_queue.empty():
message = WebSocketUIHandler.message_queue.get()
WebSocketUIHandler.broadcast(message)
class WebSocketUIHandler(WebSocketHandler):
"""WebSocket handler to send and receive data to and from a web client."""
......
......@@ -1119,11 +1119,11 @@ class TorrentRssProvider(TorrentProvider):
class NewznabProvider(NZBProvider):
provider_type = SearchProviderType.NEWZNAB
def __init__(self, name, url, key='0', catIDs='5030,5040', search_mode='eponly', search_fallback=False,
def __init__(self, name, url, api_key='0', catIDs='5030,5040', search_mode='eponly', search_fallback=False,
enable_daily=False, enable_backlog=False, default=False):
super(NewznabProvider, self).__init__(name, clean_url(url), bool(key != '0'))
super(NewznabProvider, self).__init__(name, clean_url(url), bool(api_key != '0'))
self.key = key
self.api_key = api_key
self.search_mode = search_mode
self.search_fallback = search_fallback
......@@ -1171,8 +1171,8 @@ class NewznabProvider(NZBProvider):
return False, return_categories, 'Provider requires auth and your key is not set'
url_params = {'t': 'caps'}
if self.private and self.key:
url_params['apikey'] = self.key
if self.private and self.api_key:
url_params['apikey'] = self.api_key
try:
response = self.session.get(urljoin(self.urls['base_url'], 'api'), params=url_params).text
......@@ -1204,8 +1204,8 @@ class NewznabProvider(NZBProvider):
return self.search({'q': search_string})
def _check_auth(self):
if self.private and not self.key:
sickrage.app.log.warning('Invalid api key for {}. Check your settings'.format(self.name))
if self.private and not self.api_key:
sickrage.app.log.warning('Missing API key for {}. Check your settings'.format(self.name))
return False
return True
......@@ -1261,8 +1261,8 @@ class NewznabProvider(NZBProvider):
'maxage': sickrage.app.config.general.usenet_retention
}
if self.private and self.key:
search_params['apikey'] = self.key
if self.private and self.api_key: