You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

245 lines
7.9 KiB

from sanic import Blueprint, Request, json, Sanic
from sanic.log import logger
from sanic.exceptions import SanicException
import urllib.parse
import uuid
import json as jsonlib
import time
from utils.crypto import rsa_decrypt
from utils.crypto import des3_encrypt, des3_decrypt
from utils.crypto import base64_decode, base64_encode, md5
from .model import SdoUser
bp_sdo_v1 = Blueprint("sdo_v1", url_prefix="v1")
@bp_sdo_v1.route("/account/active", methods=["GET", "POST"])
async def account_active(request: Request):
logger.debug(request.body.decode())
return json({ "code": 0, "msg": "ok", "data": { "message": "ok", "result": 0 } })
@bp_sdo_v1.route("/basic/publickey", methods=["GET", "POST"])
async def basic_publickey(request: Request):
with open(Sanic.get_app().config["PUBKEY_PATH"], "r") as f:
public_key = f.read()
public_key = public_key.replace("\n", "").replace("/", "\\/")
public_key = public_key.replace("-----BEGIN PUBLIC KEY-----", "")
public_key = public_key.replace("-----END PUBLIC KEY-----", "")
return json({ "code": 0, "msg": "", "data": { "result": 0, "message": "ok", "key": public_key, "method": "rsa" } })
def encrypt_resp(resp, key):
return base64_encode(des3_encrypt(resp.encode(), key.encode()[:24]))
@bp_sdo_v1.route("/basic/handshake", methods=["GET", "POST"])
async def basic_handshake(request: Request):
with open(Sanic.get_app().config["PRIVKEY_PATH"], "r") as f:
private_key = f.read()
data = rsa_decrypt(base64_decode(request.body.decode()), private_key)
logger.debug(data)
param = dict(urllib.parse.parse_qsl(data))
rand_key = param["randkey"]
device_id = request.headers["X-DEVICEID"]
ctx = Sanic.get_app().ctx
if not hasattr(ctx, "rand_keys"): ctx.rand_keys = {}
ctx.rand_keys[device_id] = rand_key
logger.debug(f"{rand_key=} {device_id=}")
token = uuid.uuid4().hex
resp = '{"message":"ok","result":0,"token":"' + token +'"}'
encrypted_resp = encrypt_resp(resp, rand_key)
logger.debug(encrypted_resp)
return json({ "code": 0, "msg": "ok", "data": encrypted_resp })
@bp_sdo_v1.route("/account/initialize", methods=["GET", "POST"])
async def account_initialize(request: Request):
device_id = request.headers["X-DEVICEID"]
rand_key = Sanic.get_app().ctx.rand_keys[device_id]
logger.debug(f"{rand_key=} {device_id=}")
data = des3_decrypt(base64_decode(request.body.decode()), rand_key.encode()[:24]).decode()
logger.debug(data)
initialize_resp = {
"brand_logo": "",
"brand_name": "",
"daoyu_clientid": "",
"daoyu_download_url": "",
"device_feature": "",
"display_thirdaccout": 0,
"force_show_agreement": 0,
"greport_log_level": "",
"guest_enable": 0,
"is_match": 0,
"log_level": "",
"login_button": [],
"login_icon": [],
"login_limit_enable": 0,
"need_float_window_permission": 0,
"new_device_id_server": "",
"qq_appId": "",
"qq_key": "",
"show_guest_confirm": 0,
"voicetip_button": 0,
"voicetip_one": "",
"voicetip_two": "",
"wegame_appid": "",
"wegame_appkey": "",
"wegame_clientid": "",
"wegame_companyId": "",
"wegame_loginUrl": "",
"weibo_appKey": "",
"weibo_redirectUrl": "",
"weixin_appId": "",
"weixin_key": "",
}
initialize_resp.update({
"brand_logo": "http://gskd.sdo.com/ghome/ztc/logo/og/logo_xhdpi.png",
"brand_name": "盛趣游戏",
"force_show_agreement": 1,
"greport_log_level": "off",
"log_level": "off",
"login_button": ["official"],
"login_icon": [],
"need_float_window_permission": 0, # 1,
"new_device_id_server": md5(device_id.encode()).hex(),
"show_guest_confirm": 1,
"voicetip_button": 1,
})
logger.debug(initialize_resp)
resp = jsonlib.dumps(initialize_resp)
encrypted_resp = encrypt_resp(resp, rand_key)
logger.debug(encrypted_resp)
return json({ "code": 0, "msg": "ok", "data": encrypted_resp })
@bp_sdo_v1.route("/account/login", methods=["GET", "POST"])
async def account_login(request: Request):
device_id = request.headers["X-DEVICEID"]
rand_key = Sanic.get_app().ctx.rand_keys[device_id]
logger.debug(f"{rand_key=} {device_id=}")
data = des3_decrypt(base64_decode(request.body.decode()), rand_key.encode()[:24]).decode()
logger.debug(data)
param = dict(urllib.parse.parse_qsl(data))
phone, password = param.get("phone"), param.get("password")
if not phone or not password:
raise SanicException("Empty phone or password", status_code=403)
hashed_password = md5((password + "shiosalt").encode()).hex()
logger.debug(f"{phone=} {hashed_password=}")
login_resp = {
"activation": 0,
"autokey": "",
"captchaParams": "",
"checkCodeGuid": "",
"checkCodeUrl": "",
"hasExtendAccs": 0,
"has_realInfo": 1,
"imagecodeType": 0,
"isNewUser": 0,
"message": "ok",
"nextAction": 0,
"prompt_msg": "",
"realInfoNotification": "",
"realInfo_force": 1,
"realInfo_force_pay": 0,
"realInfo_status": 0,
"realInfo_status_pay": 0,
"result": 0,
"sdg_height": 0,
"sdg_width": 0,
"ticket": "",
"userAttribute": "0",
"userid": "",
}
user, created = SdoUser.get_or_create(phone=phone)
if not created:
if user.password != hashed_password:
return json({ "code": 31, "msg": "密码有误,请联系维护者!", "data": {} })
user_id = user.user_id
else:
user_id = str(int(time.time()))
query = SdoUser.update(user_id=user_id, password=hashed_password) \
.where(SdoUser.phone == phone)
query.execute()
sifkey = "SIF_" + uuid.uuid4().hex
autokey = "AUTO_" + uuid.uuid4().hex
ticket = "TICKET_" + uuid.uuid4().hex
login_time = int(time.time())
query = SdoUser.update(sifkey=sifkey,
autokey=autokey,
ticket=ticket,
last_login_time=login_time) \
.where(SdoUser.phone == phone)
query.execute()
login_resp.update({
"autokey": autokey,
"ticket": ticket,
"userid": user_id
})
logger.debug(login_resp)
resp = jsonlib.dumps(login_resp)
encrypted_resp = encrypt_resp(resp, rand_key)
logger.debug(encrypted_resp)
return json({ "code": 0, "msg": "ok", "data": encrypted_resp })
@bp_sdo_v1.route("/account/loginauto", methods=["GET", "POST"])
async def account_loginauto(request: Request):
device_id = request.headers["X-DEVICEID"]
rand_key = Sanic.get_app().ctx.rand_keys[device_id]
logger.debug(f"{rand_key=} {device_id=}")
data = des3_decrypt(base64_decode(request.body.decode()), rand_key.encode()[:24]).decode()
logger.debug(data)
param = dict(urllib.parse.parse_qsl(data))
autokey = param.get("autokey")
if not autokey:
raise SanicException("Empty autokey", status_code=403)
user = SdoUser.get_or_none(autokey=autokey)
if not user:
return json({ "code": 31, "msg": "账号不存在或者登陆状态已过期!", "data": {} })
user_id, ticket = user.user_id, user.ticket
login_auto_resp = {
"result": 0,
"message": "ok",
"autokey": autokey,
"userid": user_id,
"ticket": ticket
}
logger.debug(login_auto_resp)
resp = jsonlib.dumps(login_auto_resp)
encrypted_resp = encrypt_resp(resp, rand_key)
logger.debug(encrypted_resp)
return json({ "code": 0, "msg": "ok", "data": encrypted_resp })
@bp_sdo_v1.route("/basic/loginarea", methods=["GET", "POST"])
async def basic_loginarea(request: Request):
user_id = ""
if request.form: user_id = request.form["userid"][0]
return json({ "code": 0, "msg": "ok", "data": { "userid": user_id } })