implement sif login

master
wlt233 7 months ago
parent fc9de9ea37
commit b38d94cf55

@ -1,31 +1,28 @@
from sanic import Sanic, Request
from sanic.response import json
from sanic import Sanic, Request, text
from sanic.log import logger
from logger import setup_log, S_LOGGING_CONFIG_DEFAULTS
from utils.logger import setup_log, S_LOGGING_CONFIG_DEFAULTS
from sdo import bp_sdo
from sif import bp_sif
setup_log()
app = Sanic("kotori", log_config=S_LOGGING_CONFIG_DEFAULTS)
app.blueprint(bp_sdo)
app.blueprint(bp_sif)
app.config["PUBKEY_PATH"] = "./publickey.pem"
app.config["PRIVKEY_PATH"] = "./privatekey.pem"
@app.middleware("request")
async def callback_request(request: Request):
logger.info(f"{request.method} - {request.path}")
logger.info(f"{request.ip} {request.method} - {request.url}")
@app.route('/')
async def test(request):
return json({'hello': 'world'})
return text("(·8·)")
if __name__ == '__main__':
app.run(port=8080)

@ -2,13 +2,13 @@ from sanic import Blueprint, Request, json, Sanic
from sanic.log import logger
from sanic.exceptions import SanicException
import urllib.parse
import uuid
import json as jsonlib
import time
import urllib.parse
import uuid
from utils.crypto import rsa_decrypt
from utils.crypto import des3_encrypt, des3_decrypt
from utils.crypto import rsa_decrypt, get_rsa_privkey
from utils.crypto import des3_decrypt, des3_encrypt
from utils.crypto import base64_decode, base64_encode, md5
from .model import SdoUser
@ -18,14 +18,14 @@ bp_sdo_v1 = Blueprint("sdo_v1", url_prefix="v1")
@bp_sdo_v1.route("/account/active", methods=["GET", "POST"])
@bp_sdo_v1.route("/account/active", methods=["POST"])
async def account_active(request: Request):
logger.debug(request.body.decode())
return json({ "code": 0, "msg": "ok", "data": { "message": "ok", "result": 0 } })
@bp_sdo_v1.route("/basic/publickey", methods=["GET", "POST"])
@bp_sdo_v1.route("/basic/publickey", methods=["POST"])
async def basic_publickey(request: Request):
with open(Sanic.get_app().config["PUBKEY_PATH"], "r") as f:
public_key = f.read()
@ -39,11 +39,10 @@ async def basic_publickey(request: Request):
def encrypt_resp(resp, key):
return base64_encode(des3_encrypt(resp.encode(), key.encode()[:24]))
@bp_sdo_v1.route("/basic/handshake", methods=["GET", "POST"])
@bp_sdo_v1.route("/basic/handshake", methods=["POST"])
async def basic_handshake(request: Request):
with open(Sanic.get_app().config["PRIVKEY_PATH"], "r") as f:
private_key = f.read()
data = rsa_decrypt(base64_decode(request.body.decode()), private_key)
privkey = get_rsa_privkey()
data = rsa_decrypt(base64_decode(request.body.decode()), privkey).decode()
logger.debug(data)
param = dict(urllib.parse.parse_qsl(data))
@ -62,7 +61,7 @@ async def basic_handshake(request: Request):
@bp_sdo_v1.route("/account/initialize", methods=["GET", "POST"])
@bp_sdo_v1.route("/account/initialize", methods=["POST"])
async def account_initialize(request: Request):
device_id = request.headers["X-DEVICEID"]
rand_key = Sanic.get_app().ctx.rand_keys[device_id]
@ -123,7 +122,7 @@ async def account_initialize(request: Request):
@bp_sdo_v1.route("/account/login", methods=["GET", "POST"])
@bp_sdo_v1.route("/account/login", methods=["POST"])
async def account_login(request: Request):
device_id = request.headers["X-DEVICEID"]
rand_key = Sanic.get_app().ctx.rand_keys[device_id]
@ -200,7 +199,7 @@ async def account_login(request: Request):
@bp_sdo_v1.route("/account/loginauto", methods=["GET", "POST"])
@bp_sdo_v1.route("/account/loginauto", methods=["POST"])
async def account_loginauto(request: Request):
device_id = request.headers["X-DEVICEID"]
rand_key = Sanic.get_app().ctx.rand_keys[device_id]
@ -237,7 +236,7 @@ async def account_loginauto(request: Request):
@bp_sdo_v1.route("/basic/loginarea", methods=["GET", "POST"])
@bp_sdo_v1.route("/basic/loginarea", methods=["POST"])
async def basic_loginarea(request: Request):
user_id = ""
if request.form: user_id = request.form["userid"][0]

@ -0,0 +1,6 @@
from sanic import Blueprint
from .login import bp_sif_login
bp_sif = Blueprint.group(bp_sif_login, url_prefix="main.php")

@ -0,0 +1,100 @@
from sanic import Sanic, Blueprint, Request, json, SanicException
from sanic.log import logger
import json as jsonlib
import time
import urllib.parse
import uuid
from utils.crypto import base64_decode, base64_encode, xor
from utils.crypto import rsa_decrypt, get_rsa_privkey, rsa_sha1_sign
from utils.crypto import aes_cbc_decrypt
from sdo.model import SdoUser
bp_sif_login = Blueprint("sif_login", url_prefix="login")
@bp_sif_login.route("authkey", methods=["POST"])
async def authkey(request: Request):
request_data = jsonlib.loads(request.form.get("request_data", "")) # type: ignore
dummy_token = base64_decode(request_data["dummy_token"])
client_token = base64_encode(rsa_decrypt(dummy_token, get_rsa_privkey()))
server_token = base64_encode(uuid.uuid4().bytes)
authorize_token = base64_encode(uuid.uuid4().bytes)
logger.debug(f"{client_token=} {server_token=} {authorize_token=}")
ctx = Sanic.get_app().ctx
if not hasattr(ctx, "auth_json"): ctx.auth_json = {}
ctx.auth_json[authorize_token] = {
"client_token": client_token,
"server_token": server_token,
}
# nonce = 0
# authorize = (f"consumerKey=lovelive_test&"
# f"timeStamp={int(time.time())}&"
# f"version=1.1&nonce={nonce}&"
# f"requestTimeStamp={int(time.time())}")
auth_resp = {
"response_data": {
"authorize_token": authorize_token,
"dummy_token": server_token
},
"release_info": {},
"status_code": 200
}
x_message_sign = base64_encode(
rsa_sha1_sign(jsonlib.dumps(auth_resp).encode(), get_rsa_privkey()))
logger.debug(f"{auth_resp=} {x_message_sign=}")
return json(auth_resp, headers={
"X-Message-Sign": x_message_sign
})
@bp_sif_login.route("login", methods=["POST"])
async def login(request: Request):
request_data = jsonlib.loads(request.form.get("request_data", "")) # type: ignore
authorize = dict(urllib.parse.parse_qsl(request.headers["authorize"]))
logger.debug(f"{request_data=} {authorize=}")
authorize_token = authorize["token"]
tokens = Sanic.get_app().ctx.auth_json[authorize_token]
client_token = base64_decode(tokens["client_token"])
server_token = base64_decode(tokens["server_token"])
aes_key = xor(client_token, server_token)[:16]
encrypt_login_key = base64_decode(request_data["login_key"])
login_key = aes_cbc_decrypt(encrypt_login_key, aes_key)[16:].decode()
encrypt_login_passwd = base64_decode(request_data["login_passwd"])
login_passwd = aes_cbc_decrypt(encrypt_login_passwd, aes_key)[16:].decode()
logger.debug(f"{login_key=} {login_passwd=}")
user = SdoUser.get_or_none(user_id=login_key, # why not sifkey???
ticket=login_passwd)
if not user:
raise SanicException("Auth failed", status_code=403)
login_resp = {
"response_data": {
"authorize_token": authorize_token,
"user_id": user.user_id,
"review_version": "",
"server_timestamp": int(time.time()),
"idfa_enabled": False,
"skip_login_news": False,
"adult_flag": 2
},
"release_info": {},
"status_code": 200
}
x_message_sign = base64_encode(
rsa_sha1_sign(jsonlib.dumps(login_resp).encode(), get_rsa_privkey()))
logger.debug(f"{login_resp=} {x_message_sign=}")
return json(login_resp, headers={
"X-Message-Sign": x_message_sign
})

@ -1,26 +1,45 @@
import base64
import hashlib
from sanic import Sanic
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Cipher import DES3
from Crypto.Cipher import PKCS1_v1_5, DES3, AES
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA1
from Crypto.Util.Padding import pad, unpad
def md5(orig: bytes) -> bytes:
m = hashlib.md5()
m.update(orig)
return bytes.fromhex(m.hexdigest())
h = hashlib.md5()
h.update(orig)
return bytes.fromhex(h.hexdigest())
def base64_encode(orig: bytes) -> str:
return base64.b64encode(orig).decode()
def base64_decode(b64: str) -> bytes:
return base64.b64decode(b64)
def rsa_decrypt(cipher_text: bytes, private_key: str) -> str:
def base64_encode(orig: bytes) -> str:
return base64.b64encode(orig).decode()
def get_rsa_privkey():
with open(Sanic.get_app().config["PRIVKEY_PATH"], "r") as f:
return f.read()
def rsa_decrypt(cipher_text: bytes, private_key: str) -> bytes:
cipher = PKCS1_v1_5.new(RSA.importKey(private_key))
return cipher.decrypt(cipher_text, b"rsa")
def rsa_encrypt(origin_text: bytes, private_key: str) -> bytes:
cipher = PKCS1_v1_5.new(RSA.importKey(private_key))
decrypt_text = cipher.decrypt(cipher_text, b"rsa")
return decrypt_text.decode("utf-8")
return cipher.encrypt(origin_text)
def rsa_sha1_sign(message: bytes, private_key: str) -> bytes:
key = RSA.import_key(private_key)
h = SHA1.new(message)
return pkcs1_15.new(key).sign(h)
def des3_decrypt(cipher_text: bytes, key: bytes) -> bytes:
@ -30,3 +49,18 @@ def des3_decrypt(cipher_text: bytes, key: bytes) -> bytes:
def des3_encrypt(origin_text: bytes, key: bytes) -> bytes:
cipher = DES3.new(key, DES3.MODE_ECB)
return cipher.encrypt(pad(origin_text, 16))
def aes_cbc_decrypt(cipher_text: bytes, key: bytes) -> bytes:
cipher = AES.new(key, AES.MODE_CBC)
return unpad(cipher.decrypt(cipher_text), 16)
def aes_cbc_encrypt(origin_text: bytes, key: bytes) -> bytes:
cipher = AES.new(key, AES.MODE_CBC)
return cipher.encrypt(pad(origin_text, 16))
def xor(a: bytes, b: bytes) -> bytes:
return bytes([ i ^ j for (i, j) in zip(bytearray(a), bytearray(b)) ])

@ -21,7 +21,11 @@ S_LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov
version=1,
disable_existing_loggers=False,
loggers={
"sanic.root": {"level": "INFO", "handlers": ["console"], "propagate": False},
"sanic.root": {
"level": "INFO",
"handlers": ["console"],
"propagate": False
},
"sanic.error": {
"level": "INFO",
"handlers": ["error_console"],
@ -43,13 +47,13 @@ S_LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov
},
handlers={
"console": {
"class": "logger.InterceptHandler",
"class": "utils.logger.InterceptHandler",
},
"error_console": {
"class": "logger.InterceptHandler",
"class": "utils.logger.InterceptHandler",
},
"access_console": {
"class": "logger.InterceptHandler",
"class": "utils.logger.InterceptHandler",
},
}
)
@ -76,4 +80,4 @@ def setup_log():
for name in logging.root.manager.loggerDict.keys():
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = True
logger.configure(handlers=[{"sink": sys.stdout, "serialize": False}])
# logger.configure(handlers=[{"sink": sys.stdout, "serialize": False}])
Loading…
Cancel
Save