diff --git a/main.py b/main.py index 9826598..a713b42 100644 --- a/main.py +++ b/main.py @@ -1,31 +1,28 @@ -from sanic import Sanic, Request -from sanic.response import json +from sanic import Sanic, Request, text from sanic.log import logger -from logger import setup_log, S_LOGGING_CONFIG_DEFAULTS - +from utils.logger import setup_log, S_LOGGING_CONFIG_DEFAULTS from sdo import bp_sdo - +from sif import bp_sif setup_log() app = Sanic("kotori", log_config=S_LOGGING_CONFIG_DEFAULTS) app.blueprint(bp_sdo) +app.blueprint(bp_sif) app.config["PUBKEY_PATH"] = "./publickey.pem" app.config["PRIVKEY_PATH"] = "./privatekey.pem" @app.middleware("request") async def callback_request(request: Request): - logger.info(f"{request.method} - {request.path}") - - + logger.info(f"{request.ip} {request.method} - {request.url}") @app.route('/') async def test(request): - return json({'hello': 'world'}) + return text("(·8·)") if __name__ == '__main__': app.run(port=8080) \ No newline at end of file diff --git a/sdo/v1.py b/sdo/v1.py index 0f0616c..4cbef47 100644 --- a/sdo/v1.py +++ b/sdo/v1.py @@ -2,13 +2,13 @@ from sanic import Blueprint, Request, json, Sanic from sanic.log import logger from sanic.exceptions import SanicException -import urllib.parse -import uuid import json as jsonlib import time +import urllib.parse +import uuid -from utils.crypto import rsa_decrypt -from utils.crypto import des3_encrypt, des3_decrypt +from utils.crypto import rsa_decrypt, get_rsa_privkey +from utils.crypto import des3_decrypt, des3_encrypt from utils.crypto import base64_decode, base64_encode, md5 from .model import SdoUser @@ -18,14 +18,14 @@ bp_sdo_v1 = Blueprint("sdo_v1", url_prefix="v1") -@bp_sdo_v1.route("/account/active", methods=["GET", "POST"]) +@bp_sdo_v1.route("/account/active", methods=["POST"]) async def account_active(request: Request): logger.debug(request.body.decode()) return json({ "code": 0, "msg": "ok", "data": { "message": "ok", "result": 0 } }) -@bp_sdo_v1.route("/basic/publickey", methods=["GET", "POST"]) +@bp_sdo_v1.route("/basic/publickey", methods=["POST"]) async def basic_publickey(request: Request): with open(Sanic.get_app().config["PUBKEY_PATH"], "r") as f: public_key = f.read() @@ -39,11 +39,10 @@ async def basic_publickey(request: Request): def encrypt_resp(resp, key): return base64_encode(des3_encrypt(resp.encode(), key.encode()[:24])) -@bp_sdo_v1.route("/basic/handshake", methods=["GET", "POST"]) +@bp_sdo_v1.route("/basic/handshake", methods=["POST"]) async def basic_handshake(request: Request): - with open(Sanic.get_app().config["PRIVKEY_PATH"], "r") as f: - private_key = f.read() - data = rsa_decrypt(base64_decode(request.body.decode()), private_key) + privkey = get_rsa_privkey() + data = rsa_decrypt(base64_decode(request.body.decode()), privkey).decode() logger.debug(data) param = dict(urllib.parse.parse_qsl(data)) @@ -62,7 +61,7 @@ async def basic_handshake(request: Request): -@bp_sdo_v1.route("/account/initialize", methods=["GET", "POST"]) +@bp_sdo_v1.route("/account/initialize", methods=["POST"]) async def account_initialize(request: Request): device_id = request.headers["X-DEVICEID"] rand_key = Sanic.get_app().ctx.rand_keys[device_id] @@ -123,7 +122,7 @@ async def account_initialize(request: Request): -@bp_sdo_v1.route("/account/login", methods=["GET", "POST"]) +@bp_sdo_v1.route("/account/login", methods=["POST"]) async def account_login(request: Request): device_id = request.headers["X-DEVICEID"] rand_key = Sanic.get_app().ctx.rand_keys[device_id] @@ -200,7 +199,7 @@ async def account_login(request: Request): -@bp_sdo_v1.route("/account/loginauto", methods=["GET", "POST"]) +@bp_sdo_v1.route("/account/loginauto", methods=["POST"]) async def account_loginauto(request: Request): device_id = request.headers["X-DEVICEID"] rand_key = Sanic.get_app().ctx.rand_keys[device_id] @@ -237,7 +236,7 @@ async def account_loginauto(request: Request): -@bp_sdo_v1.route("/basic/loginarea", methods=["GET", "POST"]) +@bp_sdo_v1.route("/basic/loginarea", methods=["POST"]) async def basic_loginarea(request: Request): user_id = "" if request.form: user_id = request.form["userid"][0] diff --git a/sif/__init__.py b/sif/__init__.py new file mode 100644 index 0000000..d09505b --- /dev/null +++ b/sif/__init__.py @@ -0,0 +1,6 @@ +from sanic import Blueprint + +from .login import bp_sif_login + +bp_sif = Blueprint.group(bp_sif_login, url_prefix="main.php") + diff --git a/sif/login.py b/sif/login.py new file mode 100644 index 0000000..e5bfa27 --- /dev/null +++ b/sif/login.py @@ -0,0 +1,100 @@ +from sanic import Sanic, Blueprint, Request, json, SanicException +from sanic.log import logger + +import json as jsonlib +import time +import urllib.parse +import uuid + +from utils.crypto import base64_decode, base64_encode, xor +from utils.crypto import rsa_decrypt, get_rsa_privkey, rsa_sha1_sign +from utils.crypto import aes_cbc_decrypt +from sdo.model import SdoUser + + + +bp_sif_login = Blueprint("sif_login", url_prefix="login") + + + +@bp_sif_login.route("authkey", methods=["POST"]) +async def authkey(request: Request): + request_data = jsonlib.loads(request.form.get("request_data", "")) # type: ignore + dummy_token = base64_decode(request_data["dummy_token"]) + client_token = base64_encode(rsa_decrypt(dummy_token, get_rsa_privkey())) + server_token = base64_encode(uuid.uuid4().bytes) + authorize_token = base64_encode(uuid.uuid4().bytes) + logger.debug(f"{client_token=} {server_token=} {authorize_token=}") + + ctx = Sanic.get_app().ctx + if not hasattr(ctx, "auth_json"): ctx.auth_json = {} + ctx.auth_json[authorize_token] = { + "client_token": client_token, + "server_token": server_token, + } + + # nonce = 0 + # authorize = (f"consumerKey=lovelive_test&" + # f"timeStamp={int(time.time())}&" + # f"version=1.1&nonce={nonce}&" + # f"requestTimeStamp={int(time.time())}") + + auth_resp = { + "response_data": { + "authorize_token": authorize_token, + "dummy_token": server_token + }, + "release_info": {}, + "status_code": 200 + } + x_message_sign = base64_encode( + rsa_sha1_sign(jsonlib.dumps(auth_resp).encode(), get_rsa_privkey())) + logger.debug(f"{auth_resp=} {x_message_sign=}") + return json(auth_resp, headers={ + "X-Message-Sign": x_message_sign + }) + + + +@bp_sif_login.route("login", methods=["POST"]) +async def login(request: Request): + request_data = jsonlib.loads(request.form.get("request_data", "")) # type: ignore + authorize = dict(urllib.parse.parse_qsl(request.headers["authorize"])) + logger.debug(f"{request_data=} {authorize=}") + + authorize_token = authorize["token"] + tokens = Sanic.get_app().ctx.auth_json[authorize_token] + client_token = base64_decode(tokens["client_token"]) + server_token = base64_decode(tokens["server_token"]) + aes_key = xor(client_token, server_token)[:16] + + encrypt_login_key = base64_decode(request_data["login_key"]) + login_key = aes_cbc_decrypt(encrypt_login_key, aes_key)[16:].decode() + encrypt_login_passwd = base64_decode(request_data["login_passwd"]) + login_passwd = aes_cbc_decrypt(encrypt_login_passwd, aes_key)[16:].decode() + logger.debug(f"{login_key=} {login_passwd=}") + + user = SdoUser.get_or_none(user_id=login_key, # why not sifkey??? + ticket=login_passwd) + if not user: + raise SanicException("Auth failed", status_code=403) + + login_resp = { + "response_data": { + "authorize_token": authorize_token, + "user_id": user.user_id, + "review_version": "", + "server_timestamp": int(time.time()), + "idfa_enabled": False, + "skip_login_news": False, + "adult_flag": 2 + }, + "release_info": {}, + "status_code": 200 + } + x_message_sign = base64_encode( + rsa_sha1_sign(jsonlib.dumps(login_resp).encode(), get_rsa_privkey())) + logger.debug(f"{login_resp=} {x_message_sign=}") + return json(login_resp, headers={ + "X-Message-Sign": x_message_sign + }) \ No newline at end of file diff --git a/utils/crypto.py b/utils/crypto.py index 3b7bbce..affc7ee 100644 --- a/utils/crypto.py +++ b/utils/crypto.py @@ -1,26 +1,45 @@ import base64 import hashlib +from sanic import Sanic from Crypto.PublicKey import RSA -from Crypto.Cipher import PKCS1_v1_5 -from Crypto.Cipher import DES3 +from Crypto.Cipher import PKCS1_v1_5, DES3, AES +from Crypto.Signature import pkcs1_15 +from Crypto.Hash import SHA1 from Crypto.Util.Padding import pad, unpad + def md5(orig: bytes) -> bytes: - m = hashlib.md5() - m.update(orig) - return bytes.fromhex(m.hexdigest()) + h = hashlib.md5() + h.update(orig) + return bytes.fromhex(h.hexdigest()) -def base64_encode(orig: bytes) -> str: - return base64.b64encode(orig).decode() def base64_decode(b64: str) -> bytes: return base64.b64decode(b64) -def rsa_decrypt(cipher_text: bytes, private_key: str) -> str: +def base64_encode(orig: bytes) -> str: + return base64.b64encode(orig).decode() + + + +def get_rsa_privkey(): + with open(Sanic.get_app().config["PRIVKEY_PATH"], "r") as f: + return f.read() + +def rsa_decrypt(cipher_text: bytes, private_key: str) -> bytes: + cipher = PKCS1_v1_5.new(RSA.importKey(private_key)) + return cipher.decrypt(cipher_text, b"rsa") + +def rsa_encrypt(origin_text: bytes, private_key: str) -> bytes: cipher = PKCS1_v1_5.new(RSA.importKey(private_key)) - decrypt_text = cipher.decrypt(cipher_text, b"rsa") - return decrypt_text.decode("utf-8") + return cipher.encrypt(origin_text) + +def rsa_sha1_sign(message: bytes, private_key: str) -> bytes: + key = RSA.import_key(private_key) + h = SHA1.new(message) + return pkcs1_15.new(key).sign(h) + def des3_decrypt(cipher_text: bytes, key: bytes) -> bytes: @@ -29,4 +48,19 @@ def des3_decrypt(cipher_text: bytes, key: bytes) -> bytes: def des3_encrypt(origin_text: bytes, key: bytes) -> bytes: cipher = DES3.new(key, DES3.MODE_ECB) - return cipher.encrypt(pad(origin_text, 16)) \ No newline at end of file + return cipher.encrypt(pad(origin_text, 16)) + + + +def aes_cbc_decrypt(cipher_text: bytes, key: bytes) -> bytes: + cipher = AES.new(key, AES.MODE_CBC) + return unpad(cipher.decrypt(cipher_text), 16) + +def aes_cbc_encrypt(origin_text: bytes, key: bytes) -> bytes: + cipher = AES.new(key, AES.MODE_CBC) + return cipher.encrypt(pad(origin_text, 16)) + + + +def xor(a: bytes, b: bytes) -> bytes: + return bytes([ i ^ j for (i, j) in zip(bytearray(a), bytearray(b)) ]) \ No newline at end of file diff --git a/logger.py b/utils/logger.py similarity index 86% rename from logger.py rename to utils/logger.py index ccab26a..cdcf4ae 100644 --- a/logger.py +++ b/utils/logger.py @@ -21,7 +21,11 @@ S_LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov version=1, disable_existing_loggers=False, loggers={ - "sanic.root": {"level": "INFO", "handlers": ["console"], "propagate": False}, + "sanic.root": { + "level": "INFO", + "handlers": ["console"], + "propagate": False + }, "sanic.error": { "level": "INFO", "handlers": ["error_console"], @@ -43,13 +47,13 @@ S_LOGGING_CONFIG_DEFAULTS: Dict[str, Any] = dict( # no cov }, handlers={ "console": { - "class": "logger.InterceptHandler", + "class": "utils.logger.InterceptHandler", }, "error_console": { - "class": "logger.InterceptHandler", + "class": "utils.logger.InterceptHandler", }, "access_console": { - "class": "logger.InterceptHandler", + "class": "utils.logger.InterceptHandler", }, } ) @@ -76,4 +80,4 @@ def setup_log(): for name in logging.root.manager.loggerDict.keys(): logging.getLogger(name).handlers = [] logging.getLogger(name).propagate = True - logger.configure(handlers=[{"sink": sys.stdout, "serialize": False}]) \ No newline at end of file + # logger.configure(handlers=[{"sink": sys.stdout, "serialize": False}]) \ No newline at end of file