import argparse import atexit import base64 import glob import io import json import logging import os import re import select import socket import sys import subprocess from subprocess import PIPE import struct import threading import time import urllib from urllib.parse import urlparse from matrix_client.api import MatrixHttpApi import PIL import requests import flask app = flask.Flask(__name__) minecraft = None roomsync = set() LOG = logging.getLogger(__name__) class socket_util(object): def __init__(self, host, port): self.host = host self.port = port self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.msglist = [] self.addr = None LOG.info("Socket Init Complete") self.proc = None self.exit = False atexit.register(self.close_socket) LOG.info("Starting Messaging Thread") msg_process = threading.Thread(target=self.msg_process) msg_process.daemon = True msg_process.start() def msg_process(self): raise NotImplementedError("Please Implement this method") def socket_reset(self): self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.msglist = [] self.addr = None def close_socket(self): self.soc.close() def send(self, message): # select returns (readable, writable, error) status on the objects passed in the corresponding lists r, w, e = select.select([], [self.soc], [], 1) # LOG.info("w" + str(w)) if w == []: return 1 string_message = json.dumps(message) select.select([], [self.soc], []) self.write_int(len(string_message)) self.write(string_message.encode("utf-8")) return 0 def write_int(self, integer): integer_buf = struct.pack(">i", integer) self.write(integer_buf) def write(self, data: bytes): data_len = len(data) offset = 0 while offset != data_len: offset += self.soc.send(data[offset:]) def receive(self): if self.soc.fileno() < 0: self.socket_reset() r, s, e = select.select([self.soc], [], [], 1) if r == []: return "" message_size = self.read_int() if message_size == None: self.close_socket() return None data = self.read(message_size) LOG.debug("data: {!r}".format(data)) if data == None: LOG.debug("data was none") return None message = json.loads(data.decode("utf-8")) return message def read_int(self): int_size = struct.calcsize(">i") intbuf = self.read(int_size) if intbuf == None: return None return struct.unpack(">i", intbuf)[0] def read(self, size): data = b"" while len(data) != size: newdata = self.soc.recv(size - len(data)) if len(newdata) == 0: return None data = data + newdata return data class MinecraftWrapper(socket_util): def __init__(self, command, host, port): self.logger = LOG.getChild("MinecraftWrapper") super().__init__(host, port) self.logger.debug(command) self.command = command self.logger.info("Starting Wrapper Polling Thread") poll_process = threading.Thread(target=self.cli_poll) poll_process.daemon = True poll_process.start() self.socket_reset() def socket_reset(self): super().socket_reset() self.logger.debug("Connecting to {}:{}".format(self.host, self.port)) backoff = 1 while True: try: self.soc.connect((self.host, self.port)) break except OSError as e: if e.errno == 111: LOG.warning( "Connection refused by {}:{}, trying again in {} seconds".format( self.host, self.port, backoff ) ) time.sleep(backoff) backoff *= 2 else: raise e self.logger.info("Socket Connected") def exe_mc(self): self.proc = subprocess.Popen( " ".join(self.command), shell=True, stdout=PIPE, stdin=PIPE, universal_newlines=True, ) for stdout_line in iter(self.proc.stdout.readline, ""): yield stdout_line return_code = self.proc.wait() if return_code: raise subprocess.CalledProcessError(return_code, self.command) def msg_process(self): while not self.exit: try: self.proc_monitor() status = 1 if len(self.msglist) > 0: status = self.send(self.msglist[-1]) rcv = self.receive() if rcv != "" and rcv != None: self.msg_handle(rcv) if status == 0: self.msglist.pop() except Exception as e: self.logger.exception(e) self.socket_reset() def msg_handle(self, msg): if len(msg) > 0: if msg[0] == "/": self.proc.stdin.write(msg + "\n") else: LOG.info(msg) def proc_monitor(self): try: if self.proc is not None and self.proc.poll() is not None: self.exit = True self.close_socket() sys.exit(0) except Exception as e: self.logger.exception("poll error") pass def cli_poll(self): prog = re.compile("\[.*\] \[(.*)\] \[(.*)\]: <(.*)> (.*)") EXAMPLE = "[07:36:28] [Server thread/INFO] [minecraft/DedicatedServer]: test" for line in self.exe_mc(): self.logger.info(line.rstrip("\n")) result = prog.search(line) if result: self.logger.info( "user: " + result.group(3) + " msg: " + result.group(4).rstrip("\n") ) self.msglist.insert( 0, { "user": result.group(3), "msg": result.group(4).rstrip("\n"), }, ) class MinecraftServerBridge(socket_util): def __init__( self, minecraft_bind_host: int, minecraft_bind_port: int, matrix_bind_port: int, appservice_token: str, server_name: str, ): # starting threads LOG.info("Starting Appservice Webserver") flask_thread = threading.Thread( target=app.run, kwargs={"port": matrix_bind_port} ) flask_thread.daemon = True flask_thread.start() # socket and other init super().__init__(minecraft_bind_host, minecraft_bind_port) LOG.info("Calling Matrix Api") self.api = MatrixHttpApi( "http://localhost:8008", token=appservice_token ) self.avatar_update_log = {} LOG.info("Finished Init") self.server_name = server_name def socket_reset(self): super().socket_reset() LOG.info("Server Binding to " + self.host + " " + str(self.port)) backoff = 1 while True: try: self.soc.bind((self.host, self.port)) break except OSError as e: if e.errno == 98: LOG.warning( "Unable to bind to port {}: {}, trying again in {} seconds".format( self.port, e.msg, backoff ) ) time.sleep(backoff) backoff *= 2 else: raise e LOG.info("Server Bound") self.soc.listen(1) LOG.info("Server listen to host") self.soc, self.addr = self.soc.accept() self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) LOG.info("Server accepted connection: " + str(self.addr)) def msg_process(self): while True: try: if len(self.msglist) > 0: status = self.send(self.msglist[-1]) if status == 0: self.msglist.pop() rcv = self.receive() if rcv != "" and rcv != None: LOG.debug(rcv) self.msg_handle(rcv) except Exception as e: LOG.exception(e) self.socket_reset() def msg_handle(self, msg): # for msg, create user and post as user # add minecraft user to minecraft channel, if this fails, no big deal try: new_user = "mc_" + msg["user"] user_id = "@{}:{}".format(new_user, self.server_name) LOG.info("trying to create user {}...".format(new_user)) self.api.register( {"type": "m.login.application_service", "username": new_user,} ) except Exception as e: LOG.exception(e) # for each room we're aware of, post server chat inside. Eventually 1 room should equal 1 server for room in roomsync: # generate a unique transaction id based on the current time txn_id = str(int(time.time() * 1000)) # attempt to join room LOG.info("trying to join room as user and as bridge manager") self.api._send( "POST", "/rooms/" + room + "/join", query_params={"user_id": user_id}, headers={"Content-Type": "application/json"}, ) self.api._send( "POST", "/rooms/" + room + "/join", headers={"Content-Type": "application/json"}, ) # set our display name to something nice LOG.info("trying to set display name...") self.api._send( "PUT", "/profile/" + user_id + "/displayname/", content={"displayname": msg["user"]}, query_params={"user_id": user_id}, headers={"Content-Type": "application/json"}, ) # get our mc skin!! # backup: #avatar_url = "https://www.minecraftskinstealer.com/face.php?u="+msg['user'] # only get this if the user hasn't updated in a long time try: LOG.info("Checking if we need to update avatar...") if ( msg["user"] not in self.avatar_update_log.keys() or abs(self.avatar_update_log[msg["user"]] - time.time()) > 180 ): self.avatar_update_log[msg["user"]] = time.time() avatar_url = self.get_mc_skin(msg["user"], user_id) if avatar_url: LOG.debug("avatar_url is " + avatar_url) self.api._send( "PUT", "/profile/" + user_id + "/avatar_url/", content={"avatar_url": avatar_url}, query_params={"user_id": user_id}, headers={"Content-Type": "application/json"}, ) except Exception as e: LOG.exception(e) # Not the end of the world if it fails, send the message now. # attempt to post in room LOG.info("Attempting to post in Room") self.api._send( "PUT", "/rooms/" + room + "/send/m.room.message/" + txn_id, content={"msgtype": "m.text", "body": msg["msg"]}, query_params={"user_id": user_id}, headers={"Content-Type": "application/json"}, ) def get_mc_skin(self, user, user_id): LOG.info("Getting Minecraft Avatar") from PIL import Image mojang_info = requests.get( "https://api.mojang.com/users/profiles/minecraft/" + user ).json() # get uuid mojang_info = requests.get( "https://sessionserver.mojang.com/session/minecraft/profile/" + mojang_info["id"] ).json() # get more info from uuid mojang_info = json.loads( base64.b64decode(mojang_info["properties"][0]["value"]) ) mojang_url = mojang_info["textures"]["SKIN"]["url"] # r = requests.get(mojang_url, stream=True) # r.raw.decode_content = True # handle spurious Content-Encoding file = io.BytesIO(urllib.request.urlopen(mojang_url).read()) im = Image.open(file) img_head = im.crop((8, 8, 16, 16)) img_head = img_head.resize( (im.width * 8, im.height * 8), resample=PIL.Image.NEAREST ) # Resize with nearest neighbor to get pixels image_buffer_head = io.BytesIO() img_head.save(image_buffer_head, "PNG") # compare to user's current id so we're not uploading the same pic twice # GET /_matrix/client/r0/profile/{userId}/avatar_url LOG.info("Getting Current Avatar URL") curr_url = self.api._send( "GET", "/profile/" + user_id + "/avatar_url/", query_params={"user_id": user_id}, headers={"Content-Type": "application/json"}, ) upload = True if "avatar_url" in curr_url.keys(): LOG.info("Checking Avatar...") file = io.BytesIO( urllib.request.urlopen( self.api.get_download_url(curr_url["avatar_url"]) ).read() ) im = Image.open(file) image_buffer_curr = io.BytesIO() im.save(image_buffer_curr, "PNG") if (image_buffer_head.getvalue()) == (image_buffer_curr.getvalue()): LOG.debug("Image Same") upload = False if upload: # upload img # POST /_matrix/media/r0/upload LOG.debug("Returning updated avatar") LOG.debug(image_buffer_head) return self.api.media_upload( image_buffer_head.getvalue(), "image/png" )["content_uri"] else: return None USER_RE = re.compile("(?<=\@).*(?=\:)") @app.route("/transactions/", methods=["PUT"]) def on_receive_events(transaction): LOG.info("got event") events = flask.request.get_json()["events"] for event in events: LOG.info("User: %s Room: %s" % (event["user_id"], event["room_id"])) LOG.info("Event Type: %s" % event["type"]) LOG.info("Content: %s" % event["content"]) roomsync.add(event["room_id"]) if ( event["type"] == "m.room.message" and event["content"]["msgtype"] == "m.text" and event["user_id"].find("@mc_") == -1 ): m_user = USER_RE.search(event["user_id"]).group(0) m_cont = event["content"]["body"] # minecraft.msglist.insert(0, "/tellraw @a {\"text\":\"<" + m_user + "> " + m_cont + "\",\"insertion\":\"/tellraw @p %s\"}") return flask.jsonify({}) @app.route("/rooms/", methods=["GET"]) def on_room(room): LOG.info("returning: " + str(room)) return flask.jsonify({}) BRIDGE_CFG_SKELETON = { "as_token": "", "server_name": "", "bridge_mcdata_port": -1, "bridge_matrixapi_port": -1, } WRAPPER_CFG_SKELETON = { "server_name": "", "wrapper_mcdata_port": -1, } def make_config(configfile, server=True): if not glob.glob(configfile): with open(configfile, "w") as outfile: if server: json.dump(BRIDGE_CFG_SKELETON, outfile) else: json.dump(WRAPPER_CFG_SKELETON, outfile) LOG.error("Please edit {0} and then run again!".format(configfile)) sys.exit(0) else: with open(configfile) as config: read_config = json.load(config) return read_config def main(): logging.basicConfig(level=logging.DEBUG) parser = argparse.ArgumentParser() mode_group = parser.add_mutually_exclusive_group(required=True) mode_group.add_argument( "--minecraft_wrapper", dest="mode", action="store_const", const="wrapper", help="Run in Minecraft server wrapper mode", ) mode_group.add_argument( "--matrix_bridge", dest="mode", action="store_const", const="bridge", help="Run in Matrix Appservice mode", ) parser.add_argument("command", nargs=argparse.REMAINDER) args = parser.parse_args() if args.mode == "wrapper": LOG.info("Running Minecraft Server Wrapper Mode") config = make_config("wrapper.json", server=False) ip_addr_info = socket.gethostbyname_ex(config["server_name"]) minecraft = MinecraftWrapper( args.command, host=ip_addr_info[2][0], port=config["wrapper_mcdata_port"], ) else: LOG.info("Running Minecraft Matrix Bridge Mode") config = make_config("server.json", server=True) minecraft = MinecraftServerBridge( minecraft_bind_host="0.0.0.0", minecraft_bind_port=config["bridge_mcdata_port"], matrix_bind_port=config["bridge_matrixapi_port"], appservice_token=config["as_token"], server_name=config["server_name"], ) LOG.info("All Threads Running") while not minecraft.exit: time.sleep(1) LOG.info("Calling exit() in main thread...") sys.exit() if __name__ == "__main__": main() # vim: set expandtab sw=4 ts=4 softtabstop=4: