Commit dfac02df authored by echel0n's avatar echel0n
Browse files

Fixed issue `There is no current event loop in thread` in websocket server

parent 7b4c3dd0
......@@ -80,6 +80,7 @@ from sickrage.core.updaters.tz_updater import TimeZoneUpdater
from sickrage.core.upnp import UPNPClient
from sickrage.core.version_updater import VersionUpdater, SourceUpdateManager
from sickrage.core.webserver import WebServer
from sickrage.core.websocket import check_web_socket_queue
from sickrage.metadata_providers import MetadataProviders
from sickrage.notification_providers import NotificationProviders
from sickrage.search_providers import SearchProviders
......@@ -566,6 +567,9 @@ class Core(object):
# shutdown trigger
PeriodicCallback(self.shutdown_trigger, 5 * 1000).start()
# watch websocket message queue
PeriodicCallback(check_web_socket_queue, 100).start()
# start ioloop
IOLoop.current().start()
......
......@@ -2,27 +2,32 @@ import json
from queue import Queue
from jose import JWTError, ExpiredSignatureError
from tornado.ioloop import IOLoop
from tornado.websocket import WebSocketHandler
import sickrage
clients = set()
message_queue = Queue()
def check_web_socket_queue():
if not WebSocketUIHandler.message_queue.empty():
message = WebSocketUIHandler.message_queue.get()
WebSocketUIHandler.broadcast(message)
class WebSocketUIHandler(WebSocketHandler):
"""WebSocket handler to send and receive data to and from a web client."""
clients = set()
message_queue = Queue()
def check_origin(self, origin):
"""Allow alternate origins."""
return True
def open(self, *args, **kwargs):
"""Client connected to the WebSocket."""
clients.add(self)
while not message_queue.empty():
self.write_message(message_queue.get())
WebSocketUIHandler.clients.add(self)
# while not WebSocketUIHandler.message_queue.empty():
# self.write_message(WebSocketUIHandler.message_queue.get())
def on_message(self, message):
"""Received a message from the client."""
......@@ -34,13 +39,13 @@ class WebSocketUIHandler(WebSocketHandler):
try:
decoded_token = sickrage.app.auth_server.decode_token(auth_token, certs)
if sickrage.app.config.user.sub_id != decoded_token.get('sub'):
clients.remove(self)
WebSocketUIHandler.clients.remove(self)
self.close(401, 'Not Authorized')
except ExpiredSignatureError:
clients.remove(self)
WebSocketUIHandler.clients.remove(self)
self.close(401, 'Token expired')
except JWTError as e:
clients.remove(self)
WebSocketUIHandler.clients.remove(self)
self.close(401, f'Improper JWT token supplied, {e!r}')
else:
sickrage.app.log.debug('WebSocket received message from {}: {}'.format(self.request.remote_ip, message))
......@@ -51,7 +56,12 @@ class WebSocketUIHandler(WebSocketHandler):
def on_close(self):
"""Client disconnected from the WebSocket."""
clients.remove(self)
WebSocketUIHandler.clients.remove(self)
@classmethod
def broadcast(cls, msg):
for client in cls.clients:
client.write_message(json.dumps(msg))
def __repr__(self):
"""Client representation."""
......@@ -84,11 +94,4 @@ class WebSocketMessage(object):
def push(self):
"""Push the message to all connected WebSocket clients."""
# message_queue.put(self.json())
for client in clients.copy():
try:
message = self.json()
# IOLoop.current().run_in_executor(None, client.write_message, message)
client.write_message(message)
except AssertionError:
continue
WebSocketUIHandler.message_queue.put(self.json())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment