import json as jsonlib import time import urllib.parse import uuid from sanic import Blueprint, Request, Sanic, json from sanic.exceptions import SanicException from sanic.log import logger from utils.crypto import (base64_decode, base64_encode, des3_decrypt, des3_encrypt, get_rsa_privkey, md5, rsa_decrypt) from .model import SdoUser bp_sdo_v1 = Blueprint("sdo_v1", url_prefix="v1") @bp_sdo_v1.route("/account/active", methods=["POST"]) async def account_active(request: Request): logger.debug(request.body.decode()) return json({ "code": 0, "msg": "ok", "data": { "message": "ok", "result": 0 } }) @bp_sdo_v1.route("/basic/publickey", methods=["POST"]) async def basic_publickey(request: Request): with open(Sanic.get_app().config["PUBKEY_PATH"], "r") as f: public_key = f.read() public_key = public_key.replace("\n", "").replace("/", "\\/") public_key = public_key.replace("-----BEGIN PUBLIC KEY-----", "") public_key = public_key.replace("-----END PUBLIC KEY-----", "") return json({ "code": 0, "msg": "", "data": { "result": 0, "message": "ok", "key": public_key, "method": "rsa" } }) def encrypt_resp(resp, key): return base64_encode(des3_encrypt(resp.encode(), key.encode()[:24])) @bp_sdo_v1.route("/basic/handshake", methods=["POST"]) async def basic_handshake(request: Request): privkey = get_rsa_privkey() data = rsa_decrypt(base64_decode(request.body.decode()), privkey).decode() logger.debug(data) param = dict(urllib.parse.parse_qsl(data)) rand_key = param["randkey"] device_id = request.headers["X-DEVICEID"] ctx = Sanic.get_app().ctx if not hasattr(ctx, "rand_keys"): ctx.rand_keys = {} ctx.rand_keys[device_id] = rand_key logger.debug(f"{rand_key=} {device_id=}") token = uuid.uuid4().hex resp = '{"message":"ok","result":0,"token":"' + token +'"}' encrypted_resp = encrypt_resp(resp, rand_key) logger.debug(encrypted_resp) return json({ "code": 0, "msg": "ok", "data": encrypted_resp }) @bp_sdo_v1.route("/account/initialize", methods=["POST"]) async def account_initialize(request: Request): device_id = request.headers["X-DEVICEID"] rand_key = Sanic.get_app().ctx.rand_keys[device_id] data = des3_decrypt(base64_decode(request.body.decode()), rand_key.encode()[:24]).decode() logger.debug(f"{rand_key=} {device_id=} {data=}") initialize_resp = { "brand_logo": "", "brand_name": "", "daoyu_clientid": "", "daoyu_download_url": "", "device_feature": "", "display_thirdaccout": 0, "force_show_agreement": 0, "greport_log_level": "", "guest_enable": 0, "is_match": 0, "log_level": "", "login_button": [], "login_icon": [], "login_limit_enable": 0, "need_float_window_permission": 0, "new_device_id_server": "", "qq_appId": "", "qq_key": "", "show_guest_confirm": 0, "voicetip_button": 0, "voicetip_one": "", "voicetip_two": "", "wegame_appid": "", "wegame_appkey": "", "wegame_clientid": "", "wegame_companyId": "", "wegame_loginUrl": "", "weibo_appKey": "", "weibo_redirectUrl": "", "weixin_appId": "", "weixin_key": "", } initialize_resp.update({ "brand_logo": "http://gskd.sdo.com/ghome/ztc/logo/og/logo_xhdpi.png", "brand_name": "盛趣游戏", "force_show_agreement": 1, "greport_log_level": "off", "log_level": "off", "login_button": ["official"], "login_icon": [], "need_float_window_permission": 0, # 1, "new_device_id_server": md5(device_id.encode()).hex(), "show_guest_confirm": 1, "voicetip_button": 1, }) logger.debug(initialize_resp) resp = jsonlib.dumps(initialize_resp) encrypted_resp = encrypt_resp(resp, rand_key) logger.debug(encrypted_resp) return json({ "code": 0, "msg": "ok", "data": encrypted_resp }) @bp_sdo_v1.route("/account/login", methods=["POST"]) async def account_login(request: Request): device_id = request.headers["X-DEVICEID"] rand_key = Sanic.get_app().ctx.rand_keys[device_id] logger.debug(f"{rand_key=} {device_id=}") data = des3_decrypt(base64_decode(request.body.decode()), rand_key.encode()[:24]).decode() logger.debug(data) param = dict(urllib.parse.parse_qsl(data)) phone, password = param.get("phone"), param.get("password") if not phone or not password: raise SanicException("Empty phone or password", status_code=403) hashed_password = md5((password + "shiosalt").encode()).hex() logger.debug(f"{phone=} {hashed_password=}") login_resp = { "activation": 0, "autokey": "", "captchaParams": "", "checkCodeGuid": "", "checkCodeUrl": "", "hasExtendAccs": 0, "has_realInfo": 1, "imagecodeType": 0, "isNewUser": 0, "message": "ok", "nextAction": 0, "prompt_msg": "", "realInfoNotification": "", "realInfo_force": 1, "realInfo_force_pay": 0, "realInfo_status": 0, "realInfo_status_pay": 0, "result": 0, "sdg_height": 0, "sdg_width": 0, "ticket": "", "userAttribute": "0", "userid": "", } user, created = SdoUser.get_or_create(phone=phone) if not created: if user.password != hashed_password: return json({ "code": 31, "msg": "密码有误,请联系维护者!", "data": {} }) user_id = user.user_id else: user_id = str(int(time.time())) query = SdoUser.update(user_id=user_id, password=hashed_password) \ .where(SdoUser.phone == phone) query.execute() sifkey = "SIF_" + uuid.uuid4().hex autokey = "AUTO_" + uuid.uuid4().hex ticket = "TICKET_" + uuid.uuid4().hex login_time = int(time.time()) query = SdoUser.update(sifkey=sifkey, autokey=autokey, ticket=ticket, last_login_time=login_time) \ .where(SdoUser.phone == phone) query.execute() login_resp.update({ "autokey": autokey, "ticket": ticket, "userid": user_id }) logger.debug(login_resp) resp = jsonlib.dumps(login_resp) encrypted_resp = encrypt_resp(resp, rand_key) logger.debug(encrypted_resp) return json({ "code": 0, "msg": "ok", "data": encrypted_resp }) @bp_sdo_v1.route("/account/loginauto", methods=["POST"]) async def account_loginauto(request: Request): device_id = request.headers["X-DEVICEID"] rand_key = Sanic.get_app().ctx.rand_keys[device_id] logger.debug(f"{rand_key=} {device_id=}") data = des3_decrypt(base64_decode(request.body.decode()), rand_key.encode()[:24]).decode() logger.debug(data) param = dict(urllib.parse.parse_qsl(data)) autokey = param.get("autokey") if not autokey: raise SanicException("Empty autokey", status_code=403) user = SdoUser.get_or_none(autokey=autokey) if not user: return json({ "code": 31, "msg": "账号不存在或者登陆状态已过期!", "data": {} }) user_id, ticket = user.user_id, user.ticket login_auto_resp = { "result": 0, "message": "ok", "autokey": autokey, "userid": user_id, "ticket": ticket } logger.debug(login_auto_resp) resp = jsonlib.dumps(login_auto_resp) encrypted_resp = encrypt_resp(resp, rand_key) logger.debug(encrypted_resp) return json({ "code": 0, "msg": "ok", "data": encrypted_resp }) @bp_sdo_v1.route("/basic/loginarea", methods=["POST"]) async def basic_loginarea(request: Request): user_id = "" if request.form: user_id = request.form["userid"][0] return json({ "code": 0, "msg": "ok", "data": { "userid": user_id } }) @bp_sdo_v1.route("/account/reportRole", methods=["POST"]) async def account_reportrole(request: Request): logger.debug(request.body.decode()) return json({ "code": 0, "msg": "ok", "data": { "message": "ok", "result": 0 } })