__init__.py 4.71 KB
Newer Older
echel0n's avatar
echel0n committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ##############################################################################
#  Author: echel0n <[email protected]>
#  URL: https://sickrage.ca/
#  Git: https://git.sickrage.ca/SiCKRAGE/sickrage.git
#  -
#  This file is part of SiCKRAGE.
#  -
#  SiCKRAGE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#  -
#  SiCKRAGE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU General Public License for more details.
#  -
#  You should have received a copy of the GNU General Public License
#  along with SiCKRAGE.  If not, see <http://www.gnu.org/licenses/>.
# ##############################################################################
echel0n's avatar
echel0n committed
21
import ssl
22
from ssl import SSLCertVerificationError
echel0n's avatar
echel0n committed
23
24
25

import pika
from pika.adapters.tornado_connection import TornadoConnection
26
27
from pika.adapters.utils.connection_workflow import AMQPConnectorException
from pika.exceptions import StreamLostError, AMQPConnectionError
echel0n's avatar
echel0n committed
28
29
30
31
32
from tornado.ioloop import IOLoop

import sickrage


33
class AMQPBase(object):
echel0n's avatar
echel0n committed
34
    def __init__(self):
35
        self._name = 'AMQP'
echel0n's avatar
echel0n committed
36
37
38
39
40
41
42
        self._amqp_host = 'rmq.sickrage.ca'
        self._amqp_port = 5671
        self._amqp_vhost = 'sickrage-app'
        self._connection = None
        self._channel = None
        self._closing = False
        self._consumer_tag = None
43
        self._prefetch_count = 100
echel0n's avatar
echel0n committed
44

45
        IOLoop.current().add_callback(self.connect)
echel0n's avatar
echel0n committed
46
47

    def connect(self):
48
        # check for api token
echel0n's avatar
echel0n committed
49
50
51
52
        if not sickrage.app.api.token or not sickrage.app.config.general.server_id:
            IOLoop.current().call_later(5, self.reconnect)
            return

53
        # refresh api token if needed
echel0n's avatar
echel0n committed
54
        if sickrage.app.api.token_time_remaining < (int(sickrage.app.api.token['expires_in']) / 2):
55
56
57
            if not sickrage.app.api.refresh_token():
                IOLoop.current().call_later(5, self.reconnect)
                return
echel0n's avatar
echel0n committed
58

59
60
61
        # declare server amqp queue
        if not sickrage.app.api.server.declare_amqp_queue(sickrage.app.config.general.server_id):
            IOLoop.current().call_later(5, self.reconnect)
62
            return
63
64

        # connect to amqp server
65
        try:
echel0n's avatar
echel0n committed
66
67
68
69
70
71
72
73
74
75
76
            credentials = pika.credentials.PlainCredentials(username='sickrage', password=sickrage.app.api.token["access_token"])

            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE

            parameters = pika.ConnectionParameters(
                host=self._amqp_host,
                port=self._amqp_port,
                virtual_host=self._amqp_vhost,
                credentials=credentials,
echel0n's avatar
echel0n committed
77
                socket_timeout=300,
echel0n's avatar
echel0n committed
78
79
80
                ssl_options=pika.SSLOptions(context)
            )

81
            TornadoConnection(
echel0n's avatar
echel0n committed
82
                parameters,
83
84
85
86
                on_open_callback=self.on_connection_open,
                on_close_callback=self.on_connection_close,
                on_open_error_callback=self.on_connection_open_error
            )
87
        except (AMQPConnectorException, AMQPConnectionError, SSLCertVerificationError):
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
            sickrage.app.log.debug("AMQP connection error, attempting to reconnect")
            IOLoop.current().call_later(5, self.reconnect)

    def disconnect(self):
        if self._channel and not self._channel.is_closed:
            try:
                self._channel.close()
            except StreamLostError:
                pass

        if self._connection and not self._connection.is_closed:
            try:
                self._connection.close()
            except StreamLostError:
                pass

        self._channel = None
        self._connection = None
echel0n's avatar
echel0n committed
106
107

    def on_connection_close(self, connection, reason):
108
109
110
        if not self._closing:
            sickrage.app.log.debug("AMQP connection closed, attempting to reconnect")
            IOLoop.current().call_later(5, self.reconnect)
echel0n's avatar
echel0n committed
111
112
113
114
115
116

    def on_connection_open(self, connection):
        self._connection = connection
        self._connection.channel(on_open_callback=self.on_channel_open)

    def on_connection_open_error(self, connection, reason):
117
        sickrage.app.log.debug("AMQP connection open failed, attempting to reconnect")
echel0n's avatar
echel0n committed
118
119
120
121
        IOLoop.current().call_later(5, self.reconnect)

    def reconnect(self):
        if not self._closing:
122
            self.disconnect()
echel0n's avatar
echel0n committed
123
124
125
126
127
128
129
            self.connect()

    def on_channel_open(self, channel):
        self._channel = channel

    def stop(self):
        self._closing = True
130
        self.disconnect()