1
0
Fork 0
matrix-appservice-minecraft/message_queue_test.py

48 lines
1.3 KiB
Python

import logging
import random
import threading
import unittest
import message_queue
class MessageQueueTest(unittest.TestCase):
def setUp(self):
logging.basicConfig(level=logging.DEBUG)
def test_message_queue(self):
port = random.randint(10000, 65535)
server_queue = message_queue.MessageQueue(
host="localhost",
port=port,
side=message_queue.Side.SERVER
)
client_queue = message_queue.MessageQueue(
host="localhost",
port=port,
side=message_queue.Side.CLIENT
)
# Worker threads + main thread
self.assertEqual(threading.active_count(), 3)
# Test sending server -> client
expected_message = {"hello": "client", "from": "server"}
server_queue.add(expected_message)
received_message = next(client_queue)
self.assertEqual(expected_message, received_message)
# Test sending client-> server
expected_message = {"hello": "server", "from": "client"}
client_queue.add(expected_message)
received_message = next(server_queue)
self.assertEqual(expected_message, received_message)
server_queue.close()
client_queue.close()
self.assertEqual(threading.active_count(), 1)
if __name__ == "__main__":
unittest.main()