Commit bbaf09a5 authored by echel0n's avatar echel0n

Created a scheduler just for queues.

parent cd847bdd
......@@ -516,38 +516,10 @@ class Core(object):
id=self.announcements.name
)
# search queue
self.scheduler.add_job(
self.search_queue.run,
IntervalTrigger(
seconds=1,
timezone='utc'
),
name=self.search_queue.name,
id=self.search_queue.name
)
# show queue
self.scheduler.add_job(
self.show_queue.run,
IntervalTrigger(
seconds=1,
timezone='utc'
),
name=self.show_queue.name,
id=self.show_queue.name
)
# post-processor queue
self.scheduler.add_job(
self.postprocessor_queue.run,
IntervalTrigger(
seconds=1,
timezone='utc'
),
name=self.postprocessor_queue.name,
id=self.postprocessor_queue.name
)
# start queues
self.search_queue.start()
self.show_queue.start()
self.postprocessor_queue.start()
# start scheduler service
self.scheduler.start()
......
......@@ -25,6 +25,8 @@ import time
import traceback
import queue
from apscheduler.schedulers.tornado import TornadoScheduler
import sickrage
......@@ -44,6 +46,7 @@ class SRQueue(object):
def __init__(self, name="QUEUE"):
super(SRQueue, self).__init__()
self.name = name
self.scheduler = TornadoScheduler({'apscheduler.timezone': 'UTC'})
self.queue = queue.PriorityQueue()
self._result_queue = queue.Queue()
self._queue_items = []
......@@ -52,6 +55,9 @@ class SRQueue(object):
self.amActive = False
self.stop = False
def start(self):
self.scheduler.start()
def run(self):
"""
Process items in this queue
......@@ -61,7 +67,7 @@ class SRQueue(object):
if not (self.stop and self.queue.empty()):
if not self.is_paused and not len(self.processing) >= int(sickrage.app.config.max_queue_workers):
sickrage.app.scheduler.add_job(self.worker, args=(self.queue.get(),))
self.scheduler.add_job(self.worker, args=(self.queue.get(),))
# threading.Thread(target=self.worker, args=(self.queue.get(),)).start()
# sickrage.app.io_loop.run_in_executor(None, self.worker, self.get())
......
......@@ -25,6 +25,8 @@ import threading
import traceback
from time import sleep
from apscheduler.triggers.interval import IntervalTrigger
import sickrage
from sickrage.core.common import cpu_presets
from sickrage.core.process_tv import ProcessResult
......@@ -49,6 +51,16 @@ class PostProcessorQueue(SRQueue):
SRQueue.__init__(self, "POSTPROCESSORQUEUE")
self._output = []
self.scheduler.add_job(
self.run,
IntervalTrigger(
seconds=1,
timezone='utc'
),
name=self.name,
id=self.name
)
@property
def output(self):
return '\n'.join(self._output)
......@@ -97,7 +109,7 @@ class PostProcessorQueue(SRQueue):
return length
async def put(self, dirName, nzbName=None, process_method=None, force=False, is_priority=None, delete_on=False,
def put(self, dirName, nzbName=None, process_method=None, force=False, is_priority=None, delete_on=False,
failed=False, proc_type="auto", force_next=False, **kwargs):
"""
Adds an item to post-processing queue
......@@ -127,8 +139,7 @@ class PostProcessorQueue(SRQueue):
return self.output
if not delete_on:
delete_on = (False, (not sickrage.app.config.no_delete, True)[process_method == "move"])[
proc_type == "auto"]
delete_on = (False, (not sickrage.app.config.no_delete, True)[process_method == "move"])[proc_type == "auto"]
if self.find_in_queue(dirName, proc_type):
self.log("An item with directory {} is already being processed in the queue".format(dirName))
......@@ -138,7 +149,7 @@ class PostProcessorQueue(SRQueue):
PostProcessorItem(dirName, nzbName, process_method, force, is_priority, delete_on, failed, proc_type))
if force_next:
result = await self._result_queue.get()
result = self._result_queue.get()
return result
self.log("{} post-processing job for {} has been added to the queue".format(proc_type.title(), dirName))
......
......@@ -22,6 +22,8 @@
import traceback
from apscheduler.triggers.interval import IntervalTrigger
import sickrage
from sickrage.core.databases.main import MainDB
from sickrage.core.queues import SRQueue, SRQueueItem, SRQueuePriorities
......@@ -43,6 +45,16 @@ class SearchQueue(SRQueue):
self.MANUAL_SEARCH_HISTORY = []
self.MANUAL_SEARCH_HISTORY_SIZE = 100
self.scheduler.add_job(
self.run,
IntervalTrigger(
seconds=1,
timezone='utc'
),
name=self.name,
id=self.name
)
def fifo(self, my_list, item, max_size=100):
if len(my_list) >= max_size:
my_list.pop(0)
......
......@@ -25,6 +25,8 @@ import os
import time
import traceback
from apscheduler.triggers.interval import IntervalTrigger
import sickrage
from sickrage.core.common import WANTED
from sickrage.core.exceptions import CantRefreshShowException, CantRemoveShowException, CantUpdateShowException, EpisodeDeletedException, \
......@@ -42,6 +44,16 @@ class ShowQueue(SRQueue):
def __init__(self):
SRQueue.__init__(self, "SHOWQUEUE")
self.scheduler.add_job(
self.run,
IntervalTrigger(
seconds=1,
timezone='utc'
),
name=self.name,
id=self.name
)
@property
def loading_show_list(self):
return [x.indexer_id for x in self.queue_items if x.is_loading]
......
......@@ -46,7 +46,7 @@ class HomeProcessEpisodeHandler(BaseHandler, ABC):
return self.write("Please use our API instead for post-processing")
@authenticated
async def post(self, *args, **kwargs):
def post(self, *args, **kwargs):
pp_options = {
'proc_dir': self.get_argument('proc_dir'),
'nzbname': self.get_argument('nzbname', ''),
......@@ -66,7 +66,7 @@ class HomeProcessEpisodeHandler(BaseHandler, ABC):
if not proc_dir:
return self.redirect("/home/postprocess/")
result = await sickrage.app.postprocessor_queue.put(proc_dir, **pp_options)
result = sickrage.app.postprocessor_queue.put(proc_dir, **pp_options)
if quiet:
return self.write(result)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment