You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
88 lines
2.9 KiB
88 lines
2.9 KiB
import time
|
|
import urllib.parse
|
|
import uuid
|
|
|
|
from sanic import Blueprint, Request, Sanic, SanicException
|
|
from sanic.log import logger
|
|
|
|
from sdo.model import SdoUser
|
|
from utils.crypto import (aes_cbc_decrypt, base64_decode, base64_encode,
|
|
get_rsa_privkey, rsa_decrypt, xor)
|
|
|
|
from ..common import pack_resp, parse_data
|
|
|
|
bp_sif_login = Blueprint("sif_login", url_prefix="login")
|
|
|
|
|
|
|
|
@bp_sif_login.route("authkey", methods=["POST"])
|
|
@parse_data
|
|
@pack_resp
|
|
async def authkey(request: Request):
|
|
dummy_token = base64_decode(request.ctx.request_data["dummy_token"])
|
|
client_token = base64_encode(rsa_decrypt(dummy_token, get_rsa_privkey()))
|
|
server_token = base64_encode(uuid.uuid4().bytes)
|
|
authorize_token = base64_encode(uuid.uuid4().bytes)
|
|
logger.debug(f"{client_token=} {server_token=} {authorize_token=}")
|
|
|
|
ctx = Sanic.get_app().ctx
|
|
if not hasattr(ctx, "auth_json"): ctx.auth_json = {}
|
|
ctx.auth_json[authorize_token] = {
|
|
"client_token": client_token,
|
|
"server_token": server_token,
|
|
}
|
|
|
|
# nonce = 0
|
|
# authorize = (f"consumerKey=lovelive_test&"
|
|
# f"timeStamp={int(time.time())}&"
|
|
# f"version=1.1&nonce={nonce}&"
|
|
# f"requestTimeStamp={int(time.time())}")
|
|
|
|
resp = {
|
|
"authorize_token": authorize_token,
|
|
"dummy_token": server_token
|
|
}
|
|
return resp
|
|
|
|
|
|
|
|
|
|
@bp_sif_login.route("login", methods=["POST"])
|
|
@parse_data
|
|
@pack_resp
|
|
async def login(request: Request):
|
|
authorize = dict(urllib.parse.parse_qsl(request.headers["authorize"]))
|
|
logger.debug(f"{authorize=}")
|
|
authorize_token = authorize["token"]
|
|
tokens = Sanic.get_app().ctx.auth_json[authorize_token]
|
|
client_token = base64_decode(tokens["client_token"])
|
|
server_token = base64_decode(tokens["server_token"])
|
|
aes_key = xor(client_token, server_token)[:16]
|
|
|
|
encrypt_login_key = base64_decode(request.ctx.request_data["login_key"])
|
|
login_key = aes_cbc_decrypt(encrypt_login_key, aes_key)[16:].decode()
|
|
encrypt_login_passwd = base64_decode(request.ctx.request_data["login_passwd"])
|
|
login_passwd = aes_cbc_decrypt(encrypt_login_passwd, aes_key)[16:].decode()
|
|
logger.debug(f"{login_key=} {login_passwd=}")
|
|
|
|
user = SdoUser.get_or_none(user_id=login_key, # why not sifkey???
|
|
ticket=login_passwd)
|
|
if not user:
|
|
raise SanicException("Auth failed", status_code=403)
|
|
|
|
new_authorize_token = base64_encode(uuid.uuid4().bytes)
|
|
ctx = Sanic.get_app().ctx
|
|
if not hasattr(ctx, "auth_token"): ctx.auth_token = {}
|
|
ctx.auth_token[user.user_id] = new_authorize_token
|
|
resp = {
|
|
"authorize_token": new_authorize_token,
|
|
"user_id": user.user_id,
|
|
"review_version": "",
|
|
"server_timestamp": int(time.time()),
|
|
"idfa_enabled": False,
|
|
"skip_login_news": False,
|
|
"adult_flag": 2
|
|
}
|
|
return resp
|
|
|