diff --git a/ServerWrapper_v0.1.0.py b/ServerWrapper_v0.1.0.py deleted file mode 100644 index 9547978..0000000 --- a/ServerWrapper_v0.1.0.py +++ /dev/null @@ -1,535 +0,0 @@ -import argparse -import atexit -import base64 -import glob -import io -import json -import logging -import os -import re -import select -import socket -import sys -import subprocess -from subprocess import PIPE -import struct -import threading -import time -import urllib -from urllib.parse import urlparse - -from matrix_client.api import MatrixHttpApi -import PIL -import requests -import flask - -app = flask.Flask(__name__) -minecraft = None -roomsync = set() - -LOG = logging.getLogger(__name__) - - -class socket_util(object): - def __init__(self, host, port): - self.host = host - self.port = port - self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.msglist = [] - self.addr = None - LOG.info("Socket Init Complete") - self.proc = None - self.exit = False - atexit.register(self.close_socket) - LOG.info("Starting Messaging Thread") - msg_process = threading.Thread(target=self.msg_process) - msg_process.daemon = True - msg_process.start() - - def msg_process(self): - raise NotImplementedError("Please Implement this method") - - def socket_reset(self): - self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.msglist = [] - self.addr = None - - def close_socket(self): - self.soc.close() - - def send(self, message): - # select returns (readable, writable, error) status on the objects passed in the corresponding lists - r, w, e = select.select([], [self.soc], [], 1) - # LOG.info("w" + str(w)) - if w == []: - return 1 - string_message = json.dumps(message) - select.select([], [self.soc], []) - self.write_int(len(string_message)) - self.write(string_message.encode("utf-8")) - return 0 - - def write_int(self, integer): - integer_buf = struct.pack(">i", integer) - self.write(integer_buf) - - def write(self, data: bytes): - data_len = len(data) - offset = 0 - while offset != data_len: - offset += self.soc.send(data[offset:]) - - def receive(self): - if self.soc.fileno() < 0: - self.socket_reset() - r, s, e = select.select([self.soc], [], [], 1) - if r == []: - return "" - message_size = self.read_int() - if message_size == None: - self.close_socket() - return None - data = self.read(message_size) - LOG.debug("data: {!r}".format(data)) - if data == None: - LOG.debug("data was none") - return None - message = json.loads(data.decode("utf-8")) - - return message - - def read_int(self): - int_size = struct.calcsize(">i") - intbuf = self.read(int_size) - if intbuf == None: - return None - return struct.unpack(">i", intbuf)[0] - - def read(self, size): - data = b"" - while len(data) != size: - newdata = self.soc.recv(size - len(data)) - if len(newdata) == 0: - return None - data = data + newdata - return data - - -class MinecraftWrapper(socket_util): - def __init__(self, command, host, port): - self.logger = LOG.getChild("MinecraftWrapper") - super().__init__(host, port) - self.logger.debug(command) - self.command = command - self.logger.info("Starting Wrapper Polling Thread") - poll_process = threading.Thread(target=self.cli_poll) - poll_process.daemon = True - poll_process.start() - self.socket_reset() - - def socket_reset(self): - super().socket_reset() - self.logger.debug("Connecting to {}:{}".format(self.host, self.port)) - backoff = 1 - while True: - try: - self.soc.connect((self.host, self.port)) - break - except OSError as e: - if e.errno == 111: - LOG.warning( - "Connection refused by {}:{}, trying again in {} seconds".format( - self.host, self.port, backoff - ) - ) - time.sleep(backoff) - backoff *= 2 - else: - raise e - self.logger.info("Socket Connected") - - def exe_mc(self): - self.proc = subprocess.Popen( - " ".join(self.command), - shell=True, - stdout=PIPE, - stdin=PIPE, - universal_newlines=True, - ) - for stdout_line in iter(self.proc.stdout.readline, ""): - yield stdout_line - return_code = self.proc.wait() - if return_code: - raise subprocess.CalledProcessError(return_code, self.command) - - def msg_process(self): - while not self.exit: - try: - self.proc_monitor() - status = 1 - if len(self.msglist) > 0: - status = self.send(self.msglist[-1]) - rcv = self.receive() - if rcv != "" and rcv != None: - self.msg_handle(rcv) - if status == 0: - self.msglist.pop() - except Exception as e: - self.logger.exception(e) - self.socket_reset() - - def msg_handle(self, msg): - if len(msg) > 0: - if msg[0] == "/": - self.proc.stdin.write(msg + "\n") - else: - LOG.info(msg) - - def proc_monitor(self): - try: - if self.proc is not None and self.proc.poll() is not None: - self.exit = True - self.close_socket() - sys.exit(0) - except Exception as e: - self.logger.exception("poll error") - pass - - def cli_poll(self): - prog = re.compile("\[.*\] \[(.*)\] \[(.*)\]: <(.*)> (.*)") - EXAMPLE = "[07:36:28] [Server thread/INFO] [minecraft/DedicatedServer]: test" - for line in self.exe_mc(): - self.logger.info(line.rstrip("\n")) - result = prog.search(line) - if result: - self.logger.info( - "user: " - + result.group(3) - + " msg: " - + result.group(4).rstrip("\n") - ) - self.msglist.insert( - 0, - { - "user": result.group(3), - "msg": result.group(4).rstrip("\n"), - }, - ) - - -class MinecraftServerBridge(socket_util): - def __init__( - self, - minecraft_bind_host: int, - minecraft_bind_port: int, - matrix_bind_port: int, - appservice_token: str, - server_name: str, - ): - # starting threads - LOG.info("Starting Appservice Webserver") - flask_thread = threading.Thread( - target=app.run, kwargs={"port": matrix_bind_port} - ) - flask_thread.daemon = True - flask_thread.start() - - # socket and other init - super().__init__(minecraft_bind_host, minecraft_bind_port) - LOG.info("Calling Matrix Api") - self.api = MatrixHttpApi( - "http://localhost:8008", token=appservice_token - ) - self.avatar_update_log = {} - LOG.info("Finished Init") - - self.server_name = server_name - - def socket_reset(self): - super().socket_reset() - LOG.info("Server Binding to " + self.host + " " + str(self.port)) - backoff = 1 - while True: - try: - self.soc.bind((self.host, self.port)) - break - except OSError as e: - if e.errno == 98: - LOG.warning( - "Unable to bind to port {}: {}, trying again in {} seconds".format( - self.port, e.msg, backoff - ) - ) - time.sleep(backoff) - backoff *= 2 - else: - raise e - LOG.info("Server Bound") - self.soc.listen(1) - LOG.info("Server listen to host") - self.soc, self.addr = self.soc.accept() - self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - LOG.info("Server accepted connection: " + str(self.addr)) - - def msg_process(self): - while True: - try: - if len(self.msglist) > 0: - status = self.send(self.msglist[-1]) - if status == 0: - self.msglist.pop() - rcv = self.receive() - if rcv != "" and rcv != None: - LOG.debug(rcv) - self.msg_handle(rcv) - except Exception as e: - LOG.exception(e) - self.socket_reset() - - def msg_handle(self, msg): - # for msg, create user and post as user - # add minecraft user to minecraft channel, if this fails, no big deal - try: - new_user = "mc_" + msg["user"] - user_id = "@{}:{}".format(new_user, self.server_name) - LOG.info("trying to create user {}...".format(new_user)) - self.api.register( - {"type": "m.login.application_service", "username": new_user,} - ) - except Exception as e: - LOG.exception(e) - # for each room we're aware of, post server chat inside. Eventually 1 room should equal 1 server - for room in roomsync: - # generate a unique transaction id based on the current time - txn_id = str(int(time.time() * 1000)) - # attempt to join room - LOG.info("trying to join room as user and as bridge manager") - self.api._send( - "POST", - "/rooms/" + room + "/join", - query_params={"user_id": user_id}, - headers={"Content-Type": "application/json"}, - ) - self.api._send( - "POST", - "/rooms/" + room + "/join", - headers={"Content-Type": "application/json"}, - ) - # set our display name to something nice - LOG.info("trying to set display name...") - self.api._send( - "PUT", - "/profile/" + user_id + "/displayname/", - content={"displayname": msg["user"]}, - query_params={"user_id": user_id}, - headers={"Content-Type": "application/json"}, - ) - - # get our mc skin!! - # backup: #avatar_url = "https://www.minecraftskinstealer.com/face.php?u="+msg['user'] - # only get this if the user hasn't updated in a long time - try: - LOG.info("Checking if we need to update avatar...") - if ( - msg["user"] not in self.avatar_update_log.keys() - or abs(self.avatar_update_log[msg["user"]] - time.time()) - > 180 - ): - self.avatar_update_log[msg["user"]] = time.time() - avatar_url = self.get_mc_skin(msg["user"], user_id) - if avatar_url: - LOG.debug("avatar_url is " + avatar_url) - self.api._send( - "PUT", - "/profile/" + user_id + "/avatar_url/", - content={"avatar_url": avatar_url}, - query_params={"user_id": user_id}, - headers={"Content-Type": "application/json"}, - ) - except Exception as e: - LOG.exception(e) - # Not the end of the world if it fails, send the message now. - - # attempt to post in room - LOG.info("Attempting to post in Room") - self.api._send( - "PUT", - "/rooms/" + room + "/send/m.room.message/" + txn_id, - content={"msgtype": "m.text", "body": msg["msg"]}, - query_params={"user_id": user_id}, - headers={"Content-Type": "application/json"}, - ) - - def get_mc_skin(self, user, user_id): - LOG.info("Getting Minecraft Avatar") - from PIL import Image - - mojang_info = requests.get( - "https://api.mojang.com/users/profiles/minecraft/" + user - ).json() # get uuid - mojang_info = requests.get( - "https://sessionserver.mojang.com/session/minecraft/profile/" - + mojang_info["id"] - ).json() # get more info from uuid - mojang_info = json.loads( - base64.b64decode(mojang_info["properties"][0]["value"]) - ) - mojang_url = mojang_info["textures"]["SKIN"]["url"] - # r = requests.get(mojang_url, stream=True) - # r.raw.decode_content = True # handle spurious Content-Encoding - file = io.BytesIO(urllib.request.urlopen(mojang_url).read()) - im = Image.open(file) - img_head = im.crop((8, 8, 16, 16)) - img_head = img_head.resize( - (im.width * 8, im.height * 8), resample=PIL.Image.NEAREST - ) # Resize with nearest neighbor to get pixels - image_buffer_head = io.BytesIO() - img_head.save(image_buffer_head, "PNG") - - # compare to user's current id so we're not uploading the same pic twice - # GET /_matrix/client/r0/profile/{userId}/avatar_url - LOG.info("Getting Current Avatar URL") - curr_url = self.api._send( - "GET", - "/profile/" + user_id + "/avatar_url/", - query_params={"user_id": user_id}, - headers={"Content-Type": "application/json"}, - ) - upload = True - if "avatar_url" in curr_url.keys(): - LOG.info("Checking Avatar...") - file = io.BytesIO( - urllib.request.urlopen( - self.api.get_download_url(curr_url["avatar_url"]) - ).read() - ) - im = Image.open(file) - image_buffer_curr = io.BytesIO() - im.save(image_buffer_curr, "PNG") - if (image_buffer_head.getvalue()) == (image_buffer_curr.getvalue()): - LOG.debug("Image Same") - upload = False - if upload: - # upload img - # POST /_matrix/media/r0/upload - LOG.debug("Returning updated avatar") - LOG.debug(image_buffer_head) - return self.api.media_upload( - image_buffer_head.getvalue(), "image/png" - )["content_uri"] - else: - return None - - -USER_RE = re.compile("(?<=\@).*(?=\:)") - - -@app.route("/transactions/", methods=["PUT"]) -def on_receive_events(transaction): - LOG.info("got event") - events = flask.request.get_json()["events"] - for event in events: - LOG.info("User: %s Room: %s" % (event["user_id"], event["room_id"])) - LOG.info("Event Type: %s" % event["type"]) - LOG.info("Content: %s" % event["content"]) - roomsync.add(event["room_id"]) - if ( - event["type"] == "m.room.message" - and event["content"]["msgtype"] == "m.text" - and event["user_id"].find("@mc_") == -1 - ): - - m_user = USER_RE.search(event["user_id"]).group(0) - m_cont = event["content"]["body"] - # minecraft.msglist.insert(0, "/tellraw @a {\"text\":\"<" + m_user + "> " + m_cont + "\",\"insertion\":\"/tellraw @p %s\"}") - - return flask.jsonify({}) - - -@app.route("/rooms/", methods=["GET"]) -def on_room(room): - LOG.info("returning: " + str(room)) - return flask.jsonify({}) - - -BRIDGE_CFG_SKELETON = { - "as_token": "", - "server_name": "", - "bridge_mcdata_port": -1, - "bridge_matrixapi_port": -1, -} -WRAPPER_CFG_SKELETON = { - "server_name": "", - "wrapper_mcdata_port": -1, -} - - -def make_config(configfile, server=True): - if not glob.glob(configfile): - with open(configfile, "w") as outfile: - if server: - json.dump(BRIDGE_CFG_SKELETON, outfile) - else: - json.dump(WRAPPER_CFG_SKELETON, outfile) - LOG.error("Please edit {0} and then run again!".format(configfile)) - sys.exit(0) - else: - with open(configfile) as config: - read_config = json.load(config) - return read_config - - -def main(): - logging.basicConfig(level=logging.DEBUG) - - parser = argparse.ArgumentParser() - mode_group = parser.add_mutually_exclusive_group(required=True) - mode_group.add_argument( - "--minecraft_wrapper", - dest="mode", - action="store_const", - const="wrapper", - help="Run in Minecraft server wrapper mode", - ) - mode_group.add_argument( - "--matrix_bridge", - dest="mode", - action="store_const", - const="bridge", - help="Run in Matrix Appservice mode", - ) - parser.add_argument("command", nargs=argparse.REMAINDER) - args = parser.parse_args() - - if args.mode == "wrapper": - LOG.info("Running Minecraft Server Wrapper Mode") - config = make_config("wrapper.json", server=False) - ip_addr_info = socket.gethostbyname_ex(config["server_name"]) - minecraft = MinecraftWrapper( - args.command, - host=ip_addr_info[2][0], - port=config["wrapper_mcdata_port"], - ) - else: - LOG.info("Running Minecraft Matrix Bridge Mode") - config = make_config("server.json", server=True) - minecraft = MinecraftServerBridge( - minecraft_bind_host="0.0.0.0", - minecraft_bind_port=config["bridge_mcdata_port"], - matrix_bind_port=config["bridge_matrixapi_port"], - appservice_token=config["as_token"], - server_name=config["server_name"], - ) - LOG.info("All Threads Running") - while not minecraft.exit: - time.sleep(1) - LOG.info("Calling exit() in main thread...") - sys.exit() - - -if __name__ == "__main__": - main() - -# vim: set expandtab sw=4 ts=4 softtabstop=4: diff --git a/service.py b/service.py index 47a7348..e08fd2c 100644 --- a/service.py +++ b/service.py @@ -11,6 +11,7 @@ import urllib from matrix_client.api import MatrixHttpApi import PIL + import requests import flask @@ -27,6 +28,7 @@ roomsync = set() @app.route("/transactions/", methods=["PUT"]) def on_receive_events(transaction): + global global_msg_queue LOG.info("got event") events = flask.request.get_json()["events"] for event in events: @@ -50,6 +52,8 @@ def on_receive_events(transaction): m_user, m_cont ), }) + else: + LOG.warning("No message queue available") return flask.jsonify({})