Commit e8a6b75a authored by echel0n's avatar echel0n

Refactored queue system class as threaded and to use APScheduler to execute jobs from queue.

parent cc4f0bd6
......@@ -36,14 +36,14 @@ twilio == 6.21.0
chardet == 3.0.4
pytz == 2018.7
tzlocal == 2.0.0b1
raven == 6.9.0
raven == 6.10.0
python-keycloak-client == 0.2.2
simplejson == 3.16.0
service_identity == 17.0.0
knowit == 0.2.4
sqlalchemy == 1.3.2
sqlalchemy-migrate == 0.12.0
mutagen == 1.42.0
sqlalchemy == 1.3.13
sqlalchemy-migrate == 0.13.0
mutagen == 1.44.0
deluge-client == 1.7.1
PyMySQL
certifi
......
......@@ -520,9 +520,9 @@ class Core(object):
self.scheduler.start()
# start queue's
self.io_loop.add_callback(self.search_queue.watch)
self.io_loop.add_callback(self.show_queue.watch)
self.io_loop.add_callback(self.postprocessor_queue.watch)
self.io_loop.add_callback(self.search_queue.start)
self.io_loop.add_callback(self.show_queue.start)
self.io_loop.add_callback(self.postprocessor_queue.start)
# fire off startup events
self.io_loop.run_in_executor(None, self.quicksearch_cache.run)
......
......@@ -16,13 +16,14 @@
#
# You should have received a copy of the GNU General Public License
# along with SiCKRAGE. If not, see <http://www.gnu.org/licenses/>.
import ctypes
import datetime
import multiprocessing
import threading
import time
import traceback
from tornado import gen
from tornado.queues import Queue, PriorityQueue
import queue
import sickrage
......@@ -39,18 +40,19 @@ class SRQueuePriorities(object):
PAUSED = 99
class SRQueue(object):
class SRQueue(threading.Thread):
def __init__(self, name="QUEUE"):
super(SRQueue, self).__init__()
self.name = name
self.queue = PriorityQueue()
self._result_queue = Queue()
self.queue = queue.PriorityQueue()
self._result_queue = queue.Queue()
self._queue_items = []
self.processing = []
self.min_priority = SRQueuePriorities.EXTREME
self.amActive = False
self.stop = False
async def watch(self):
def run(self):
"""
Process items in this queue
"""
......@@ -59,9 +61,11 @@ class SRQueue(object):
while not (self.stop and self.queue.empty()):
if not self.is_paused and not len(self.processing) >= int(sickrage.app.config.max_queue_workers):
sickrage.app.io_loop.run_in_executor(None, self.worker, await self.get())
sickrage.app.scheduler.add_job(self.worker, args=(self.queue.get(),))
# threading.Thread(target=self.worker, args=(self.queue.get(),)).start()
# sickrage.app.io_loop.run_in_executor(None, self.worker, self.get())
await gen.sleep(1)
time.sleep(1)
self.amActive = False
......@@ -78,13 +82,14 @@ class SRQueue(object):
except Exception:
sickrage.app.log.debug(traceback.format_exc())
finally:
self._queue_items.remove(item)
self.processing.remove(item)
self.queue.task_done()
async def get(self):
return await self.queue.get()
def get(self):
return self.queue.get()
async def put(self, item, *args, **kwargs):
def put(self, item, *args, **kwargs):
"""
Adds an item to this queue
......@@ -97,13 +102,14 @@ class SRQueue(object):
item.added = datetime.datetime.now()
item.name = "{}-{}".format(self.name, item.name)
item.result_queue = self._result_queue
await self.queue.put(item)
self._queue_items.append(item)
self.queue.put(item)
return item
@property
def queue_items(self):
return self.queue._queue + self.processing
return self._queue_items + self.processing
@property
def is_busy(self):
......@@ -124,9 +130,9 @@ class SRQueue(object):
self.min_priority = SRQueuePriorities.EXTREME
def remove(self, item):
if item in self.queue._queue:
self.queue._queue.remove(item)
elif item in self.processing:
if item in self._queue_items:
self._queue_items.remove(item)
if item in self.processing:
self.processing.remove(item)
def stop_item(self, item):
......
......@@ -27,7 +27,6 @@ import traceback
import sickrage
from sickrage.core.common import WANTED
from sickrage.core.databases.main import MainDB
from sickrage.core.exceptions import CantRefreshShowException, CantRemoveShowException, CantUpdateShowException, EpisodeDeletedException, \
MultipleShowObjectsException
from sickrage.core.queues import SRQueue, SRQueueItem, SRQueuePriorities
......@@ -81,9 +80,9 @@ class ShowQueue(SRQueue):
raise CantUpdateShowException("{} is already being updated, can't update again until it's done.".format(show_obj.name))
if force:
sickrage.app.io_loop.add_callback(self.put, QueueItemForceUpdate(indexer_id, indexer_update_only))
self.put(QueueItemForceUpdate(indexer_id, indexer_update_only))
else:
sickrage.app.io_loop.add_callback(self.put, QueueItemUpdate(indexer_id, indexer_update_only))
self.put(QueueItemUpdate(indexer_id, indexer_update_only))
def refresh_show(self, indexer_id, force=False):
show_obj = find_show(indexer_id)
......@@ -97,13 +96,13 @@ class ShowQueue(SRQueue):
sickrage.app.log.debug("Queueing show refresh for {}".format(show_obj.name))
sickrage.app.io_loop.add_callback(self.put, QueueItemRefresh(indexer_id, force=force))
self.put(QueueItemRefresh(indexer_id, force=force))
def rename_show_episodes(self, indexer_id):
sickrage.app.io_loop.add_callback(self.put, QueueItemRename(indexer_id))
self.put(QueueItemRename(indexer_id))
def download_subtitles(self, indexer_id):
sickrage.app.io_loop.add_callback(self.put, QueueItemSubtitle(indexer_id))
self.put(QueueItemSubtitle(indexer_id))
def add_show(self, indexer, indexer_id, showDir, default_status=None, quality=None, flatten_folders=None,
lang=None, subtitles=None, sub_use_sr_metadata=None, anime=None, scene=None, paused=None,
......@@ -112,22 +111,22 @@ class ShowQueue(SRQueue):
if lang is None:
lang = sickrage.app.config.indexer_default_language
sickrage.app.io_loop.add_callback(self.put, QueueItemAdd(indexer=indexer,
indexer_id=indexer_id,
showDir=showDir,
default_status=default_status,
quality=quality,
flatten_folders=flatten_folders,
lang=lang,
subtitles=subtitles,
sub_use_sr_metadata=sub_use_sr_metadata,
anime=anime,
scene=scene,
paused=paused,
blacklist=blacklist,
whitelist=whitelist,
default_status_after=default_status_after,
skip_downloaded=skip_downloaded))
self.put(QueueItemAdd(indexer=indexer,
indexer_id=indexer_id,
showDir=showDir,
default_status=default_status,
quality=quality,
flatten_folders=flatten_folders,
lang=lang,
subtitles=subtitles,
sub_use_sr_metadata=sub_use_sr_metadata,
anime=anime,
scene=scene,
paused=paused,
blacklist=blacklist,
whitelist=whitelist,
default_status_after=default_status_after,
skip_downloaded=skip_downloaded))
def remove_show(self, indexer_id, full=False):
show_obj = find_show(indexer_id)
......@@ -141,8 +140,7 @@ class ShowQueue(SRQueue):
# remove other queued actions for this show.
[self.remove(x) for x in self.queue_items if indexer_id == x.indexer_id]
sickrage.app.io_loop.add_callback(self.put, QueueItemRemove(indexer_id=indexer_id, full=full))
self.put(QueueItemRemove(indexer_id=indexer_id, full=full))
class ShowQueueActions(object):
......
......@@ -25,7 +25,6 @@ from abc import ABC
from collections import OrderedDict
from urllib.parse import unquote_plus, quote_plus
from sqlalchemy import orm
from tornado import gen
from tornado.escape import json_encode
from tornado.httputil import url_concat
......@@ -35,7 +34,6 @@ import sickrage
from sickrage.clients import get_client_instance
from sickrage.clients.nzb.sabnzbd import SabNZBd
from sickrage.core.common import Overview, Quality, cpu_presets, statusStrings
from sickrage.core.databases.main import MainDB
from sickrage.core.exceptions import (
AnidbAdbaConnectionException,
CantRefreshShowException,
......@@ -773,7 +771,6 @@ class DisplayShowHandler(BaseHandler, ABC):
submenu = []
session = sickrage.app.main_db.session()
show_obj = find_show(int(show))
if not show_obj:
return self._genericMessage(_("Error"), _("Show not in show list"))
......
......@@ -359,56 +359,56 @@
</div>
<div class="container-fluid">
% if current_user:
<footer class="text-center">
<div>
% if overall_stats:
<%
total_size = pretty_file_size(overall_stats['total_size'])
ep_downloaded = overall_stats['episodes']['downloaded']
ep_snatched = overall_stats['episodes']['snatched']
ep_total = overall_stats['episodes']['total']
ep_percentage = '' if ep_total == 0 else '(<span class="text-primary">%s%%</span>)' % re.sub(r'(\d+)(\.\d)\d+', r'\1\2', str((float(ep_downloaded)/float(ep_total))*100))
%>
<span class="text-primary">${overall_stats['shows']['total']}</span> ${_('Shows')}
(<span class="text-primary">${overall_stats['shows']['active']}</span> ${_('Active')})
| <span class="text-primary">${ep_downloaded}</span>
% if ep_snatched:
<span class="text-primary">
<a href="${srWebRoot}/manage/episodeStatuses?whichStatus=2">+${ep_snatched}</a>
</span>
${_('Snatched')}
% endif
/&nbsp;<span class="text-primary">${ep_total}</span> ${_('Episodes Downloaded')} ${ep_percentage}
/&nbsp;<span class="text-primary">${total_size}</span> ${_('Overall Downloaded')}
% endif
</div>
<div>
${_('Daily Search:')} <span
class="text-primary">${str(sickrage.app.scheduler.get_job('DAILYSEARCHER').next_run_time).split('.')[0]}</span>
|
${_('Backlog Search:')} <span
class="text-primary">${str(sickrage.app.scheduler.get_job('BACKLOG').next_run_time).split('.')[0]}</span>
|
${_('Memory used:')}
<span class="text-primary">
${memory_usage()}
</span> |
${_('Load time:')}
<span class="text-primary">
${"{:10.4f}".format(time() - srStartTime)}s
</span> / Mako:
<span class="text-primary">
${"{:10.4f}".format(time() - makoStartTime)}s
</span> |
${_('Now:')}
<span class="text-primary">
${str(datetime.datetime.now(sickrage.app.tz)).split('.')[0]}
</span>
</div>
</footer>
% endif
## % if current_user:
## <footer class="text-center">
## <div>
## % if overall_stats:
## <%
## total_size = pretty_file_size(overall_stats['total_size'])
## ep_downloaded = overall_stats['episodes']['downloaded']
## ep_snatched = overall_stats['episodes']['snatched']
## ep_total = overall_stats['episodes']['total']
## ep_percentage = '' if ep_total == 0 else '(<span class="text-primary">%s%%</span>)' % re.sub(r'(\d+)(\.\d)\d+', r'\1\2', str((float(ep_downloaded)/float(ep_total))*100))
## %>
## <span class="text-primary">${overall_stats['shows']['total']}</span> ${_('Shows')}
## (<span class="text-primary">${overall_stats['shows']['active']}</span> ${_('Active')})
## | <span class="text-primary">${ep_downloaded}</span>
## % if ep_snatched:
## <span class="text-primary">
## <a href="${srWebRoot}/manage/episodeStatuses?whichStatus=2">+${ep_snatched}</a>
## </span>
## ${_('Snatched')}
## % endif
## /&nbsp;<span class="text-primary">${ep_total}</span> ${_('Episodes Downloaded')} ${ep_percentage}
## /&nbsp;<span class="text-primary">${total_size}</span> ${_('Overall Downloaded')}
## % endif
## </div>
##
## <div>
## ${_('Daily Search:')} <span
## class="text-primary">${str(sickrage.app.scheduler.get_job('DAILYSEARCHER').next_run_time).split('.')[0]}</span>
## |
## ${_('Backlog Search:')} <span
## class="text-primary">${str(sickrage.app.scheduler.get_job('BACKLOG').next_run_time).split('.')[0]}</span>
## |
## ${_('Memory used:')}
## <span class="text-primary">
## ${memory_usage()}
## </span> |
## ${_('Load time:')}
## <span class="text-primary">
## ${"{:10.4f}".format(time() - srStartTime)}s
## </span> / Mako:
## <span class="text-primary">
## ${"{:10.4f}".format(time() - makoStartTime)}s
## </span> |
## ${_('Now:')}
## <span class="text-primary">
## ${str(datetime.datetime.now(sickrage.app.tz)).split('.')[0]}
## </span>
## </div>
## </footer>
## % endif
<script src="${srWebRoot}/js/core.min.js"></script>
<%block name="scripts" />
......
......@@ -104,12 +104,12 @@ class GenericProvider(object):
@property
def urls(self):
try:
resp = sickrage.app.api.provider.get_urls(self.id)
if resp and 'data' in resp:
return json.loads(resp['data']['urls'])
except (JSONDecodeError, APIError) as e:
pass
# try:
# resp = sickrage.app.api.provider.get_urls(self.id)
# if resp and 'data' in resp:
# return json.loads(resp['data']['urls'])
# except (JSONDecodeError, APIError) as e:
# pass
return self._urls
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment