Commit fed64cbb authored by echel0n's avatar echel0n

Added back Tornado queue buckets, resolves application lockups

parent bbaf09a5
......@@ -19,13 +19,11 @@
import ctypes
import datetime
import multiprocessing
import threading
import time
import traceback
import queue
from apscheduler.schedulers.tornado import TornadoScheduler
from tornado.queues import Queue, PriorityQueue
import sickrage
......@@ -47,8 +45,8 @@ class SRQueue(object):
super(SRQueue, self).__init__()
self.name = name
self.scheduler = TornadoScheduler({'apscheduler.timezone': 'UTC'})
self.queue = queue.PriorityQueue()
self._result_queue = queue.Queue()
self.queue = PriorityQueue()
self._result_queue = Queue()
self._queue_items = []
self.processing = []
self.min_priority = SRQueuePriorities.EXTREME
......@@ -58,16 +56,16 @@ class SRQueue(object):
def start(self):
self.scheduler.start()
def run(self):
async def run(self):
"""
Process items in this queue
"""
self.amActive = True
if not (self.stop and self.queue.empty()):
if not self.stop and not self.queue.empty():
if not self.is_paused and not len(self.processing) >= int(sickrage.app.config.max_queue_workers):
self.scheduler.add_job(self.worker, args=(self.queue.get(),))
self.scheduler.add_job(self.worker, args=(await self.queue.get(),))
# threading.Thread(target=self.worker, args=(self.queue.get(),)).start()
# sickrage.app.io_loop.run_in_executor(None, self.worker, self.get())
......@@ -90,10 +88,10 @@ class SRQueue(object):
self.processing.remove(item)
self.queue.task_done()
def get(self):
return self.queue.get()
async def get(self):
return await self.queue.get()
def put(self, item, *args, **kwargs):
async def put(self, item, *args, **kwargs):
"""
Adds an item to this queue
......@@ -107,7 +105,7 @@ class SRQueue(object):
item.name = "{}-{}".format(self.name, item.name)
item.result_queue = self._result_queue
self._queue_items.append(item)
self.queue.put(item)
await self.queue.put(item)
return item
......
......@@ -109,7 +109,7 @@ class PostProcessorQueue(SRQueue):
return length
def put(self, dirName, nzbName=None, process_method=None, force=False, is_priority=None, delete_on=False,
async def put(self, dirName, nzbName=None, process_method=None, force=False, is_priority=None, delete_on=False,
failed=False, proc_type="auto", force_next=False, **kwargs):
"""
Adds an item to post-processing queue
......@@ -149,7 +149,7 @@ class PostProcessorQueue(SRQueue):
PostProcessorItem(dirName, nzbName, process_method, force, is_priority, delete_on, failed, proc_type))
if force_next:
result = self._result_queue.get()
result = await self._result_queue.get()
return result
self.log("{} post-processing job for {} has been added to the queue".format(proc_type.title(), dirName))
......
......@@ -92,9 +92,9 @@ class ShowQueue(SRQueue):
raise CantUpdateShowException("{} is already being updated, can't update again until it's done.".format(show_obj.name))
if force:
self.put(QueueItemForceUpdate(indexer_id, indexer_update_only))
sickrage.app.io_loop.add_callback(self.put, QueueItemForceUpdate(indexer_id, indexer_update_only))
else:
self.put(QueueItemUpdate(indexer_id, indexer_update_only))
sickrage.app.io_loop.add_callback(self.put, QueueItemUpdate(indexer_id, indexer_update_only))
def refresh_show(self, indexer_id, force=False):
show_obj = find_show(indexer_id)
......@@ -108,13 +108,13 @@ class ShowQueue(SRQueue):
sickrage.app.log.debug("Queueing show refresh for {}".format(show_obj.name))
self.put(QueueItemRefresh(indexer_id, force=force))
sickrage.app.io_loop.add_callback(self.put, QueueItemRefresh(indexer_id, force=force))
def rename_show_episodes(self, indexer_id):
self.put(QueueItemRename(indexer_id))
sickrage.app.io_loop.add_callback(self.put, QueueItemRename(indexer_id))
def download_subtitles(self, indexer_id):
self.put(QueueItemSubtitle(indexer_id))
sickrage.app.io_loop.add_callback(self.put, QueueItemSubtitle(indexer_id))
def add_show(self, indexer, indexer_id, showDir, default_status=None, quality=None, flatten_folders=None,
lang=None, subtitles=None, sub_use_sr_metadata=None, anime=None, scene=None, paused=None,
......@@ -123,22 +123,22 @@ class ShowQueue(SRQueue):
if lang is None:
lang = sickrage.app.config.indexer_default_language
self.put(QueueItemAdd(indexer=indexer,
indexer_id=indexer_id,
showDir=showDir,
default_status=default_status,
quality=quality,
flatten_folders=flatten_folders,
lang=lang,
subtitles=subtitles,
sub_use_sr_metadata=sub_use_sr_metadata,
anime=anime,
scene=scene,
paused=paused,
blacklist=blacklist,
whitelist=whitelist,
default_status_after=default_status_after,
skip_downloaded=skip_downloaded))
sickrage.app.io_loop.add_callback(self.put, QueueItemAdd(indexer=indexer,
indexer_id=indexer_id,
showDir=showDir,
default_status=default_status,
quality=quality,
flatten_folders=flatten_folders,
lang=lang,
subtitles=subtitles,
sub_use_sr_metadata=sub_use_sr_metadata,
anime=anime,
scene=scene,
paused=paused,
blacklist=blacklist,
whitelist=whitelist,
default_status_after=default_status_after,
skip_downloaded=skip_downloaded))
def remove_show(self, indexer_id, full=False):
show_obj = find_show(indexer_id)
......@@ -152,7 +152,7 @@ class ShowQueue(SRQueue):
# remove other queued actions for this show.
[self.remove(x) for x in self.queue_items if indexer_id == x.indexer_id]
self.put(QueueItemRemove(indexer_id=indexer_id, full=full))
sickrage.app.io_loop.add_callback(self.put, QueueItemRemove(indexer_id=indexer_id, full=full))
class ShowQueueActions(object):
......
......@@ -46,7 +46,7 @@ class HomeProcessEpisodeHandler(BaseHandler, ABC):
return self.write("Please use our API instead for post-processing")
@authenticated
def post(self, *args, **kwargs):
async def post(self, *args, **kwargs):
pp_options = {
'proc_dir': self.get_argument('proc_dir'),
'nzbname': self.get_argument('nzbname', ''),
......@@ -66,8 +66,7 @@ class HomeProcessEpisodeHandler(BaseHandler, ABC):
if not proc_dir:
return self.redirect("/home/postprocess/")
result = sickrage.app.postprocessor_queue.put(proc_dir, **pp_options)
result = await sickrage.app.postprocessor_queue.put(proc_dir, **pp_options)
if quiet:
return self.write(result)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment