import random import threading import unittest import message_queue class MessageQueueTest(unittest.TestCase): def test_message_queue(self): """Test basic functionality""" port = random.randint(10000, 65535) server_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.SERVER ) client_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.CLIENT ) # Worker threads + main thread self.assertEqual(threading.active_count(), 3) # Test sending server -> client expected_message = {"hello": "client", "from": "server"} server_queue.add(expected_message) received_message = next(client_queue) self.assertEqual(expected_message, received_message) # Test sending client-> server expected_message = {"hello": "server", "from": "client"} client_queue.add(expected_message) received_message = next(server_queue) self.assertEqual(expected_message, received_message) server_queue.close() client_queue.close() self.assertEqual(threading.active_count(), 1) def test_disconnect_reconnect_client(self): """Test that the server can send messages while the client is offline""" port = random.randint(10000, 65535) server_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.SERVER ) client_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.CLIENT ) # Worker threads + main thread self.assertEqual(threading.active_count(), 3) expected_message = {"index": "0"} server_queue.add(expected_message) received_message = next(client_queue) self.assertEqual(expected_message, received_message) expected_message = {"index": "1"} client_queue.close() server_queue.add(expected_message) self.assertEqual(threading.active_count(), 2) client_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.CLIENT ) received_message = next(client_queue) self.assertEqual(expected_message, received_message) server_queue.close() client_queue.close() self.assertEqual(threading.active_count(), 1) def test_disconnect_reconnect_server(self): """Test that the client can send messages while the server is offline""" port = random.randint(10000, 65535) server_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.SERVER ) client_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.CLIENT ) # Worker threads + main thread self.assertEqual(threading.active_count(), 3) expected_message = {"index": "0"} client_queue.add(expected_message) received_message = next(server_queue) self.assertEqual(expected_message, received_message) expected_message = {"index": "1"} server_queue.close() client_queue.add(expected_message) self.assertEqual(threading.active_count(), 2) server_queue = message_queue.MessageQueue( host="localhost", port=port, side=message_queue.Side.SERVER ) received_message = next(server_queue) self.assertEqual(expected_message, received_message) server_queue.close() client_queue.close() self.assertEqual(threading.active_count(), 1) if __name__ == "__main__": unittest.main(verbosity=2)