1
0
Fork 0
This commit is contained in:
khr 2020-04-01 23:33:49 -07:00
parent 42ae40c0cd
commit d362a6c922
4 changed files with 17 additions and 31 deletions

View File

@ -64,7 +64,7 @@ class SocketWrapper:
LOG.debug("Connecting to {}:{}".format(self.host, self.port)) LOG.debug("Connecting to {}:{}".format(self.host, self.port))
self.soc = _try_with_backoff( self.soc = _try_with_backoff(
lambda: socket.create_connection((self.host, self.port)), lambda: socket.create_connection((self.host, self.port)),
lambda e: e is OSError and e.errno == 111 lambda e: e is OSError and e.errno == 111,
) )
LOG.info("Socket Connected") LOG.info("Socket Connected")
@ -72,7 +72,7 @@ class SocketWrapper:
LOG.info("Server Binding to {}:{}".format(self.host, self.port)) LOG.info("Server Binding to {}:{}".format(self.host, self.port))
self.soc = _try_with_backoff( self.soc = _try_with_backoff(
lambda: socket_create_server((self.host, self.port)), lambda: socket_create_server((self.host, self.port)),
lambda e: e is OSError and e.errno == 98 lambda e: e is OSError and e.errno == 98,
) )
LOG.info("Server Bound") LOG.info("Server Bound")
@ -161,12 +161,13 @@ class MessageQueue:
self.closed = False self.closed = False
self.process_worker = threading.Thread( self.process_worker = threading.Thread(
target=( target=(
process_messages_client if side == Side.CLIENT process_messages_client
if side == Side.CLIENT
else process_messages_server else process_messages_server
), ),
args=(SocketWrapper(host, port), self), args=(SocketWrapper(host, port), self),
daemon=True, daemon=True,
name="MessageQueue/" + str(side) name="MessageQueue/" + str(side),
) )
self.process_worker.start() self.process_worker.start()

View File

@ -7,21 +7,16 @@ import message_queue
class MessageQueueTest(unittest.TestCase): class MessageQueueTest(unittest.TestCase):
def setUp(self): def setUp(self):
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
def test_message_queue(self): def test_message_queue(self):
port = random.randint(10000, 65535) port = random.randint(10000, 65535)
server_queue = message_queue.MessageQueue( server_queue = message_queue.MessageQueue(
host="localhost", host="localhost", port=port, side=message_queue.Side.SERVER
port=port,
side=message_queue.Side.SERVER
) )
client_queue = message_queue.MessageQueue( client_queue = message_queue.MessageQueue(
host="localhost", host="localhost", port=port, side=message_queue.Side.CLIENT
port=port,
side=message_queue.Side.CLIENT
) )
# Worker threads + main thread # Worker threads + main thread
self.assertEqual(threading.active_count(), 3) self.assertEqual(threading.active_count(), 3)

View File

@ -222,14 +222,10 @@ def main():
side=message_queue.Side.SERVER, side=message_queue.Side.SERVER,
) )
flask_thread = threading.Thread( flask_thread = threading.Thread(
target=app.run, target=app.run, kwargs={"port": args.matrix_api_port}, daemon=True,
kwargs={"port": args.matrix_api_port},
daemon=True,
) )
receive_worker = threading.Thread( receive_worker = threading.Thread(
target=receive_messages, target=receive_messages, args=(appservice, queue), daemon=True,
args=(appservice, queue),
daemon=True,
) )
flask_thread.start() flask_thread.start()
receive_worker.start() receive_worker.start()

View File

@ -42,15 +42,13 @@ def send_process_output(
LOG.info(line.rstrip("\n")) LOG.info(line.rstrip("\n"))
result = prog.search(line) result = prog.search(line)
if result: if result:
LOG.info("user: {} msg: {}".format( LOG.info(
result.group(3), "user: {} msg: {}".format(
result.group(4).rstrip("\n"), result.group(3), result.group(4).rstrip("\n"),
)) )
)
msg_queue.add( msg_queue.add(
{ {"user": result.group(3), "msg": result.group(4).rstrip("\n"),},
"user": result.group(3),
"msg": result.group(4).rstrip("\n"),
},
) )
@ -80,14 +78,10 @@ def main():
side=message_queue.Side.CLIENT, side=message_queue.Side.CLIENT,
) )
send_worker = threading.Thread( send_worker = threading.Thread(
target=send_process_output, target=send_process_output, args=(wrapper, queue), daemon=True,
args=(wrapper, queue),
daemon=True,
) )
receive_worker = threading.Thread( receive_worker = threading.Thread(
target=relay_queue_input, target=relay_queue_input, args=(wrapper, queue), daemon=True,
args=(wrapper, queue),
daemon=True,
) )
send_worker.start() send_worker.start()
receive_worker.start() receive_worker.start()