From 3d0316216948e9f114cbf1942b9ffff9a8b7e142 Mon Sep 17 00:00:00 2001 From: chr Date: Thu, 2 Apr 2020 21:52:36 -0700 Subject: [PATCH] verified working locally --- message_queue.py | 8 +++++--- service.py | 1 + wrapper.py | 34 ++++++++++++++++++++++------------ 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/message_queue.py b/message_queue.py index ae6dd17..acb91c8 100644 --- a/message_queue.py +++ b/message_queue.py @@ -20,7 +20,7 @@ class ProtocolError(Exception): T = TypeVar("T") -def _try_with_backoff(fn: Callable, error_callback: Callable) -> socket.socket: +def _try_with_backoff(fn: Callable[[], T], error_callback: Callable) -> T: backoff = 1 while True: try: @@ -109,7 +109,7 @@ class SocketWrapper: self._write(payload) LOG.debug("sent {} bytes".format(len(payload))) - def _read(self, size) -> Optional[bytes]: + def _read(self, size) -> bytes: data = b"" while len(data) != size: newdata = self.soc.recv(size - len(data)) @@ -176,7 +176,9 @@ class MessageQueue: return self def __next__(self): - return json.loads(self.inbox.get()) + m = self.inbox.get() + LOG.debug(m) + return json.loads(m) def close(self): self.closed = True diff --git a/service.py b/service.py index e08fd2c..6aefbab 100644 --- a/service.py +++ b/service.py @@ -214,6 +214,7 @@ def receive_messages( def main(): + global global_msg_queue logging.basicConfig(level=logging.DEBUG) parser = argparse.ArgumentParser() parser.add_argument("--matrix_server_name", required=True) diff --git a/wrapper.py b/wrapper.py index 4a67585..2e69b47 100644 --- a/wrapper.py +++ b/wrapper.py @@ -33,23 +33,28 @@ class ProcessWrapper: def wait(self): return self.proc.wait() + @property + def closed(self) -> bool: + return self.proc.returncode is not None + def send_process_output( process: ProcessWrapper, msg_queue: message_queue.MessageQueue ): # "[07:36:28] [Server thread/INFO] [minecraft/DedicatedServer]: test" - prog = re.compile(r"\[.*\] \[(.*)\] \[(.*)\]: <(.*)> (.*)") + log = LOG.getChild("process_output") + prog = re.compile(r"(\[.*\] )?\[(.*)\] \[(.*)\]: <(.*)> (.*)") for line in process: - LOG.info(line.rstrip("\n")) + log.info(line.rstrip("\n")) result = prog.search(line) if result: - LOG.info( + log.info( "user: {} msg: {}".format( - result.group(3), result.group(4).rstrip("\n"), + result.group(4), result.group(5).rstrip("\n"), ) ) msg_queue.add( - {"user": result.group(3), "msg": result.group(4).rstrip("\n")}, + {"user": result.group(4), "msg": result.group(5).rstrip("\n")}, ) @@ -57,13 +62,18 @@ def relay_queue_input( process: ProcessWrapper, msg_queue: message_queue.MessageQueue ): log = LOG.getChild("relay_input") - for message in msg_queue: - log.debug(message) - if "command" in message: - log.debug("forwarding to process") - process.send(message["command"] + "\n") - else: - LOG.debug(message) + while not process.closed: + try: + for message in msg_queue: + log.debug(message) + if "command" in message: + command = message["command"] + "\n" + log.debug("forwarding to process: {!r}".format(command)) + process.send(command) + else: + log.debug(message) + except Exception as e: + log.exception(e) def main():