1
0
Fork 0
matrix-appservice-minecraft/ServerWrapper_v0.1.0.py

454 lines
17 KiB
Python

import argparse
import atexit
import base64
import glob
import io
import json
import logging
import os
import re
import select
import socket
import sys
import subprocess
from subprocess import PIPE
import struct
import threading
import time
import urllib
from urllib.parse import urlparse
from matrix_client.api import MatrixHttpApi
import PIL
import requests
from flask import Flask, jsonify, request
#constants
global_config = {}
#
app = Flask(__name__)
minecraft = None
roomsync = set()
LOG = logging.getLogger(__name__)
class socket_util(object):
def __init__(self, host, port):
self.host = host
self.port = port
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.msglist = []
self.addr = None
LOG.info ("Socket Init Complete")
self.proc = None
self.exit = False
atexit.register(self.close_socket)
LOG.info("Starting Messaging Thread")
msg_process = threading.Thread(target=self.msg_process)
msg_process.daemon = True
msg_process.start()
def msg_process(self):
raise NotImplementedError("Please Implement this method")
def socket_reset(self):
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.msglist = []
self.addr = None
def close_socket(self):
self.soc.close()
def send(self, message):
#select returns (readable, writable, error) status on the objects passed in the corresponding lists
r, w, e = select.select([], [self.soc], [], 1)
#LOG.info("w" + str(w))
if w == []:
return 1
string_message = json.dumps(message)
select.select([], [self.soc], [])
self.write_int(len(string_message))
self.write(string_message.encode('utf-8'))
return 0
def write_int(self, integer):
integer_buf = struct.pack('>i', integer)
self.write(integer_buf)
def write(self, data: bytes):
data_len = len(data)
offset = 0
while offset != data_len:
offset += self.soc.send(data[offset:])
def receive(self):
if self.soc.fileno() < 0:
self.socket_reset()
r,s,e = select.select([self.soc], [], [], 1)
if r == []:
return ""
message_size = self.read_int()
if message_size == None:
self.close_socket()
return None
data = self.read(message_size)
LOG.debug("data: {!r}".format(data))
if data == None:
LOG.debug("data was none")
return None
message = json.loads(data.decode("utf-8"))
return message
def read_int(self):
int_size = struct.calcsize('>i')
intbuf = self.read(int_size)
if intbuf == None:
return None
return struct.unpack('>i', intbuf)[0]
def read(self, size):
data = b""
while len(data) != size:
newdata = self.soc.recv(size - len(data))
if len(newdata) == 0:
return None
data = data + newdata
return data
class MinecraftWrapper(socket_util):
def __init__(self, command, host, port):
self.logger = LOG.getChild("MinecraftWrapper")
super().__init__(host, port)
self.logger.debug(command)
self.command = command
self.logger.info("Starting Wrapper Polling Thread")
poll_process = threading.Thread(target=self.cli_poll)
poll_process.daemon = True
poll_process.start()
self.socket_reset()
def socket_reset(self):
super().socket_reset()
self.logger.debug("Connecting to {}:{}".format(self.host, self.port))
backoff = 1
while True:
try:
self.soc.connect((self.host, self.port))
break
except OSError as e:
if e.errno == 111:
LOG.warning(
"Connection refused by {}:{}, trying again in {} seconds".format(
self.host, self.port, backoff
)
)
time.sleep(backoff)
backoff *= 2
else:
raise e
self.logger.info("Socket Connected")
def exe_mc(self):
self.proc = subprocess.Popen(
" ".join(self.command), shell=True, stdout=PIPE, stdin=PIPE, universal_newlines=True
)
for stdout_line in iter(self.proc.stdout.readline, ""):
yield stdout_line
return_code = self.proc.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, self.command)
def msg_process(self):
while not self.exit:
try:
self.proc_monitor()
status = 1
if len(self.msglist) > 0:
status = self.send(self.msglist[-1])
rcv = self.receive()
if rcv != "" and rcv != None:
self.msg_handle(rcv)
if status == 0: self.msglist.pop()
except Exception as e:
self.logger.exception(e)
self.socket_reset()
def msg_handle(self, msg):
if len(msg) > 0:
if msg[0] == '/':
self.proc.stdin.write(msg + '\n')
else:
LOG.info(msg)
def proc_monitor(self):
try:
if self.proc is not None and self.proc.poll() is not None:
self.exit = True
self.close_socket()
sys.exit(0)
except Exception as e:
self.logger.exception("poll error")
pass
def cli_poll(self):
prog = re.compile("\[.*\] \[(.*)\] \[(.*)\]: <(.*)> (.*)")
EXAMPLE = "[07:36:28] [Server thread/INFO] [minecraft/DedicatedServer]: <khr_> test"
for line in self.exe_mc():
self.logger.info(line.rstrip('\n'))
result = prog.search(line)
if result:
self.logger.info("user: " + result.group(3) + " msg: " +result.group(4).rstrip('\n'))
self.msglist.insert(0, {"user":result.group(3),"msg":result.group(4).rstrip('\n')})
class MinecraftServerBridge(socket_util):
def __init__(
self,
minecraft_bind_host: int,
minecraft_bind_port: int,
matrix_bind_port: int,
appservice_token: str,
server_name: str
):
#starting threads
LOG.info ("Starting Appservice Webserver")
flask_thread = threading.Thread(target=app.run,kwargs={ "port": matrix_bind_port })
flask_thread.daemon = True
flask_thread.start()
#socket and other init
super().__init__(minecraft_bind_host, minecraft_bind_port)
LOG.info ("Calling Matrix Api")
self.api = MatrixHttpApi("http://localhost:8008", token=appservice_token)
self.avatar_update_log = {}
LOG.info ("Finished Init")
self.server_name = server_name
def socket_reset(self):
super().socket_reset()
LOG.info("Server Binding to " + self.host + " " + str(self.port))
backoff = 1
while True:
try:
self.soc.bind((self.host, self.port))
break
except OSError as e:
if e.errno == 98:
LOG.warning(
"Unable to bind to port {}: {}, trying again in {} seconds".format(
self.port, e.msg, backoff
)
)
time.sleep(backoff)
backoff *= 2
else:
raise e
LOG.info("Server Bound")
self.soc.listen(1)
LOG.info("Server listen to host")
self.soc, self.addr = self.soc.accept()
self.soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
LOG.info("Server accepted connection: " + str(self.addr))
def msg_process(self):
while(True):
try:
if len(self.msglist) > 0:
status = self.send(self.msglist[-1])
if status == 0: self.msglist.pop()
rcv = self.receive()
if rcv != "" and rcv != None:
LOG.debug(rcv)
self.msg_handle(rcv)
except Exception as e:
LOG.exception(e)
self.socket_reset()
def msg_handle(self, msg):
#for msg, create user and post as user
#add minecraft user to minecraft channel, if this fails, no big deal
try:
new_user = "mc_" + msg['user']
user_id = "@{}:{}".format(new_user, self.server_name)
LOG.info("trying to create user {}...".format(new_user))
self.api.register({
"type": "m.login.application_service",
"username": new_user,
})
except Exception as e:
LOG.exception(e)
#for each room we're aware of, post server chat inside. Eventually 1 room should equal 1 server
for room in roomsync:
#generate a unique transaction id based on the current time
txn_id = str(int(time.time() * 1000))
#attempt to join room
LOG.info("trying to join room as user and as bridge manager")
self.api._send("POST", '/rooms/'+room+'/join', query_params={"user_id": user_id}, headers={"Content-Type":"application/json"})
self.api._send("POST", '/rooms/'+room+'/join', headers={"Content-Type":"application/json"})
#set our display name to something nice
LOG.info("trying to set display name...")
self.api._send("PUT", '/profile/'+user_id+'/displayname/', content={"displayname":msg["user"]}, query_params={"user_id": user_id}, headers={"Content-Type":"application/json"})
#get our mc skin!!
#backup: #avatar_url = "https://www.minecraftskinstealer.com/face.php?u="+msg['user']
#only get this if the user hasn't updated in a long time
try:
LOG.info("Checking if we need to update avatar...")
if msg['user'] not in self.avatar_update_log.keys() or abs(self.avatar_update_log[msg['user']] - time.time()) > 180:
self.avatar_update_log[msg['user']] = time.time()
avatar_url = self.get_mc_skin(msg['user'], user_id)
if avatar_url:
LOG.debug("avatar_url is " + avatar_url)
self.api._send("PUT", '/profile/'+user_id+'/avatar_url/', content={"avatar_url":avatar_url}, query_params={"user_id": user_id}, headers={"Content-Type":"application/json"})
except Exception as e:
LOG.exception(e)
# Not the end of the world if it fails, send the message now.
#attempt to post in room
LOG.info("Attempting to post in Room")
self.api._send("PUT", '/rooms/'+room+'/send/m.room.message/' + txn_id, content={"msgtype":"m.text","body":msg["msg"]}, query_params={"user_id": user_id}, headers={"Content-Type":"application/json"})
def get_mc_skin(self, user, user_id):
LOG.info("Getting Minecraft Avatar")
from PIL import Image
mojang_info = requests.get('https://api.mojang.com/users/profiles/minecraft/'+user).json() #get uuid
mojang_info = requests.get('https://sessionserver.mojang.com/session/minecraft/profile/'+mojang_info['id']).json() #get more info from uuid
mojang_info = json.loads(base64.b64decode(mojang_info['properties'][0]['value']))
mojang_url = mojang_info['textures']['SKIN']['url']
#r = requests.get(mojang_url, stream=True)
#r.raw.decode_content = True # handle spurious Content-Encoding
file = io.BytesIO(urllib.request.urlopen(mojang_url).read())
im = Image.open(file)
img_head = im.crop((8,8,16,16))
img_head = img_head.resize((im.width * 8, im.height * 8), resample=PIL.Image.NEAREST) # Resize with nearest neighbor to get pixels
image_buffer_head = io.BytesIO()
img_head.save(image_buffer_head, "PNG")
#compare to user's current id so we're not uploading the same pic twice
#GET /_matrix/client/r0/profile/{userId}/avatar_url
LOG.info("Getting Current Avatar URL")
curr_url = self.api._send("GET", '/profile/'+user_id+'/avatar_url/', query_params={"user_id": user_id}, headers={"Content-Type":"application/json"})
upload = True
if 'avatar_url' in curr_url.keys():
LOG.info("Checking Avatar...")
file = io.BytesIO(urllib.request.urlopen(self.api.get_download_url(curr_url['avatar_url'])).read())
im = Image.open(file)
image_buffer_curr = io.BytesIO()
im.save(image_buffer_curr, "PNG")
if (image_buffer_head.getvalue()) == (image_buffer_curr.getvalue()):
LOG.debug("Image Same")
upload = False
if upload:
#upload img
#POST /_matrix/media/r0/upload
LOG.debug("Returning updated avatar")
LOG.debug(image_buffer_head)
return self.api.media_upload(image_buffer_head.getvalue(), "image/png")["content_uri"]
else:
return None
USER_RE = re.compile("(?<=\@).*(?=\:)")
@app.route("/transactions/<transaction>", methods=["PUT"])
def on_receive_events(transaction):
LOG.info("got event")
events = request.get_json()["events"]
for event in events:
LOG.info("User: %s Room: %s" % (event["user_id"], event["room_id"]))
LOG.info("Event Type: %s" % event["type"])
LOG.info("Content: %s" % event["content"])
roomsync.add(event["room_id"])
if event['type'] == 'm.room.message' and \
event['content']['msgtype'] == 'm.text' and \
event["user_id"].find("@mc_") == -1:
m_user = USER_RE.search(event["user_id"]).group(0)
m_cont = event['content']['body']
# minecraft.msglist.insert(0, "/tellraw @a {\"text\":\"<" + m_user + "> " + m_cont + "\",\"insertion\":\"/tellraw @p %s\"}")
return jsonify({})
@app.route("/rooms/<room>", methods=["GET"])
def on_room(room):
LOG.info("returning: " + str(room))
return jsonify({})
bridge_cfg_skeleton = {"as_token":"", "server_name":"","bridge_mcdata_port":-1, "bridge_matrixapi_port":-1}
wrapper_cfg_skeleton = {"server_name":"","wrapper_mcdata_port":-1,}
def make_config(configfile, server=True):
if not glob.glob(configfile):
with open(configfile, 'w') as outfile:
if server:
json.dump(bridge_cfg_skeleton, outfile)
else:
json.dump(wrapper_cfg_skeleton, outfile)
LOG.error("Please edit {0} and then run again!".format(configfile))
sys.exit(0)
elif glob.glob(configfile):
with open(configfile) as config:
read_config = json.load(config)
return read_config
def main():
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser()
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument(
"--minecraft_wrapper",
dest="mode",
action="store_const",
const="wrapper",
help="Run in Minecraft server wrapper mode",
)
mode_group.add_argument(
"--matrix_bridge",
dest="mode",
action="store_const",
const="bridge",
help="Run in Matrix Appservice mode",
)
parser.add_argument("command", nargs=argparse.REMAINDER)
args = parser.parse_args()
if args.mode == "wrapper":
LOG.info("Running Minecraft Server Wrapper Mode")
global_config = make_config("wrapper.json", server=False)
ip_addr_info = socket.gethostbyname_ex(global_config['server_name'])
minecraft = MinecraftWrapper(
args.command,
host=ip_addr_info[2][0],
port=global_config['wrapper_mcdata_port'],
)
else:
LOG.info("Running Minecraft Matrix Bridge Mode")
global_config = make_config("server.json", server=True)
minecraft = MinecraftServerBridge(
minecraft_bind_host="0.0.0.0",
minecraft_bind_port=global_config['bridge_mcdata_port'],
matrix_bind_port=global_config["bridge_matrixapi_port"],
appservice_token=global_config["as_token"],
server_name=global_config["server_name"],
)
LOG.info("All Threads Running")
while (not minecraft.exit):
time.sleep(1)
LOG.info("Calling exit() in main thread...")
sys.exit()
if __name__ == "__main__":
main()
# vim: set expandtab sw=4 ts=4 softtabstop=4: