implement sif download

master
wlt233 7 months ago
parent b38d94cf55
commit 42d1fdffc4

File diff suppressed because one or more lines are too long

@ -2,27 +2,42 @@ from sanic import Sanic, Request, text
from sanic.log import logger
from utils.logger import setup_log, S_LOGGING_CONFIG_DEFAULTS
from utils.assets import load_assets
from sdo import bp_sdo
from sif import bp_sif
setup_log()
app = Sanic("kotori", log_config=S_LOGGING_CONFIG_DEFAULTS)
app.blueprint(bp_sdo)
app.blueprint(bp_sif)
app.config["PUBKEY_PATH"] = "./publickey.pem"
app.config["PRIVKEY_PATH"] = "./privatekey.pem"
app.config["ASSETS_LIST_PATH"] = "./data/assets_list.json"
app.config["ASSETS_PATH_ANDROID"] = "E:/20240802_sif/honoka-chan/static/Android/archives"
app.config["ASSETS_PATH_IOS"] = ""
app.config["STATIC_ROOT"] = "http://localsif:8080"
app.config["PACKAGE_VERSION"] = "97.4.6"
@app.before_server_start
async def attach_db(app, loop):
load_assets()
@app.middleware("request")
async def callback_request(request: Request):
logger.info(f"{request.ip} {request.method} - {request.url}")
@app.route('/')
async def test(request):
return text("(·8·)")
if __name__ == '__main__':
app.run(port=8080)

@ -8,7 +8,7 @@ class BaseModel(Model):
class SdoUser(BaseModel):
id = IntegerField(primary_key=True)
user_id = IntegerField(null=True)
user_id = TextField(null=True)
phone = TextField()
password = TextField(null=True)

@ -65,9 +65,8 @@ async def basic_handshake(request: Request):
async def account_initialize(request: Request):
device_id = request.headers["X-DEVICEID"]
rand_key = Sanic.get_app().ctx.rand_keys[device_id]
logger.debug(f"{rand_key=} {device_id=}")
data = des3_decrypt(base64_decode(request.body.decode()), rand_key.encode()[:24]).decode()
logger.debug(data)
logger.debug(f"{rand_key=} {device_id=} {data=}")
initialize_resp = {
"brand_logo": "",
"brand_name": "",
@ -232,12 +231,16 @@ async def account_loginauto(request: Request):
@bp_sdo_v1.route("/basic/loginarea", methods=["POST"])
async def basic_loginarea(request: Request):
user_id = ""
if request.form: user_id = request.form["userid"][0]
return json({ "code": 0, "msg": "ok", "data": { "userid": user_id } })
@bp_sdo_v1.route("/account/reportRole", methods=["POST"])
async def account_reportrole(request: Request):
logger.debug(request.body.decode())
return json({ "code": 0, "msg": "ok", "data": { "message": "ok", "result": 0 } })

@ -1,6 +1,13 @@
from sanic import Blueprint
from .login import bp_sif_login
from .user import bp_sif_user
from .misc import bp_sif_misc
from .download import bp_sif_download
bp_sif = Blueprint.group(bp_sif_login, url_prefix="main.php")
bp_sif = Blueprint.group(bp_sif_login,
bp_sif_user,
bp_sif_misc,
bp_sif_download,
url_prefix="main.php")

@ -0,0 +1,57 @@
from sanic import Sanic, json, SanicException
from sanic.log import logger
from decorator import decorator
import json as jsonlib
import urllib.parse
from utils.crypto import base64_encode
from utils.crypto import get_rsa_privkey, rsa_sha1_sign
@decorator
def check_token(f, *args, **kw):
request = args[0]
user_id = request.headers["user-id"]
request.ctx.user_id = user_id
authorize = dict(urllib.parse.parse_qsl(request.headers["authorize"]))
token = authorize["token"]
logger.debug(f"{request.path=} {user_id=} {token=}")
return f(*args, **kw)
if token != Sanic.get_app().ctx.auth_token[user_id]:
raise SanicException("Auth failed", status_code=403)
return f(*args, **kw)
@decorator
def parse_data(f, *args, **kw):
request = args[0]
request_data = jsonlib.loads(request.form.get("request_data", "")) # type: ignore
logger.debug(f"{request.path=} {request_data=}")
request.ctx.request_data = request_data
return f(*args, **kw)
@decorator
async def pack_resp(f, *args, **kw):
request = args[0]
ret = await f(*args, **kw)
data = {
"response_data": ret,
"release_info": {},
"status_code": 200
}
x_message_sign = base64_encode(
rsa_sha1_sign(jsonlib.dumps(data).encode(), get_rsa_privkey()))
logger.debug(f"{request.path=} {data=} {x_message_sign=}")
return json(data, headers={
"X-Message-Sign": x_message_sign,
"Server-Version": Sanic.get_app().config["PACKAGE_VERSION"]
})
def common_api(f):
return check_token(parse_data(pack_resp(f)))

@ -0,0 +1,84 @@
from sanic import Sanic, Blueprint, Request
from sanic.log import logger
from .common import common_api
bp_sif_download = Blueprint("sif_download", url_prefix="download")
@bp_sif_download.route("additional", methods=["POST"])
@common_api
async def additional(request: Request):
request_data = request.ctx.request_data
static_root = Sanic.get_app().config["STATIC_ROOT"]
package_type = request_data.get("package_type", 0)
package_id = request_data.get("package_id", 0)
target_os = request_data.get("target_os", "")
download_resp = []
package_list = Sanic.get_app().ctx.assets_list[target_os][package_type][package_id]
static_root = Sanic.get_app().config["STATIC_ROOT"]
for package_order, package_size in package_list:
download_resp.append({
"size": package_size,
"url": f"{static_root}/{target_os}/{package_type}_{package_id}_{package_order}.zip"
})
return download_resp
@bp_sif_download.route("batch", methods=["POST"])
@common_api
async def batch(request: Request):
request_data = request.ctx.request_data
static_root = Sanic.get_app().config["STATIC_ROOT"]
download_resp = []
if request_data.get("client_version", "") == Sanic.get_app().config["PACKAGE_VERSION"]: # 97.4.6
package_type = request_data.get("package_type", 0)
excluded_package_ids = request_data.get("excluded_package_ids", [])
os = request_data.get("os", "")
package_dict = Sanic.get_app().ctx.assets_list[os][package_type]
for package_id, package_order_list in package_dict.items():
if package_id in excluded_package_ids: continue
package_order_list.sort(key=lambda x: x[0])
for package_order, package_size in package_order_list:
download_resp.append({
"size": package_size,
"url": f"{static_root}/{os}/{package_type}_{package_id}_{package_order}.zip"
})
return download_resp
@bp_sif_download.route("update", methods=["POST"])
@common_api
async def update(request: Request):
request_data = request.ctx.request_data
static_root = Sanic.get_app().config["STATIC_ROOT"]
version = Sanic.get_app().config["PACKAGE_VERSION"]
target_os = request_data.get("target_os", "")
download_resp = []
if request_data.get("external_version", "") != version: # 97.4.6
package_type = 99
package_dict = Sanic.get_app().ctx.assets_list[target_os][package_type]
for package_id, package_order_list in package_dict.items():
package_order_list.sort(key=lambda x: x[0])
for package_order, package_size in package_order_list:
download_resp.append({
"size": package_size,
"url": f"{static_root}/{target_os}/{package_type}_{package_id}_{package_order}.zip",
"version": version
})
return download_resp
@bp_sif_download.route("event", methods=["POST"])
@common_api
async def event(request: Request):
return []

@ -1,15 +1,15 @@
from sanic import Sanic, Blueprint, Request, json, SanicException
from sanic import Sanic, Blueprint, Request, SanicException
from sanic.log import logger
import json as jsonlib
import time
import urllib.parse
import uuid
from utils.crypto import base64_decode, base64_encode, xor
from utils.crypto import rsa_decrypt, get_rsa_privkey, rsa_sha1_sign
from utils.crypto import rsa_decrypt, get_rsa_privkey
from utils.crypto import aes_cbc_decrypt
from sdo.model import SdoUser
from .common import parse_data, pack_resp
@ -18,9 +18,10 @@ bp_sif_login = Blueprint("sif_login", url_prefix="login")
@bp_sif_login.route("authkey", methods=["POST"])
@parse_data
@pack_resp
async def authkey(request: Request):
request_data = jsonlib.loads(request.form.get("request_data", "")) # type: ignore
dummy_token = base64_decode(request_data["dummy_token"])
dummy_token = base64_decode(request.ctx.request_data["dummy_token"])
client_token = base64_encode(rsa_decrypt(dummy_token, get_rsa_privkey()))
server_token = base64_encode(uuid.uuid4().bytes)
authorize_token = base64_encode(uuid.uuid4().bytes)
@ -40,37 +41,29 @@ async def authkey(request: Request):
# f"requestTimeStamp={int(time.time())}")
auth_resp = {
"response_data": {
"authorize_token": authorize_token,
"dummy_token": server_token
},
"release_info": {},
"status_code": 200
}
x_message_sign = base64_encode(
rsa_sha1_sign(jsonlib.dumps(auth_resp).encode(), get_rsa_privkey()))
logger.debug(f"{auth_resp=} {x_message_sign=}")
return json(auth_resp, headers={
"X-Message-Sign": x_message_sign
})
return auth_resp
@bp_sif_login.route("login", methods=["POST"])
@parse_data
@pack_resp
async def login(request: Request):
request_data = jsonlib.loads(request.form.get("request_data", "")) # type: ignore
authorize = dict(urllib.parse.parse_qsl(request.headers["authorize"]))
logger.debug(f"{request_data=} {authorize=}")
logger.debug(f"{authorize=}")
authorize_token = authorize["token"]
tokens = Sanic.get_app().ctx.auth_json[authorize_token]
client_token = base64_decode(tokens["client_token"])
server_token = base64_decode(tokens["server_token"])
aes_key = xor(client_token, server_token)[:16]
encrypt_login_key = base64_decode(request_data["login_key"])
encrypt_login_key = base64_decode(request.ctx.request_data["login_key"])
login_key = aes_cbc_decrypt(encrypt_login_key, aes_key)[16:].decode()
encrypt_login_passwd = base64_decode(request_data["login_passwd"])
encrypt_login_passwd = base64_decode(request.ctx.request_data["login_passwd"])
login_passwd = aes_cbc_decrypt(encrypt_login_passwd, aes_key)[16:].decode()
logger.debug(f"{login_key=} {login_passwd=}")
@ -79,22 +72,18 @@ async def login(request: Request):
if not user:
raise SanicException("Auth failed", status_code=403)
new_authorize_token = base64_encode(uuid.uuid4().bytes)
ctx = Sanic.get_app().ctx
if not hasattr(ctx, "auth_token"): ctx.auth_token = {}
ctx.auth_token[user.user_id] = new_authorize_token
login_resp = {
"response_data": {
"authorize_token": authorize_token,
"authorize_token": new_authorize_token,
"user_id": user.user_id,
"review_version": "",
"server_timestamp": int(time.time()),
"idfa_enabled": False,
"skip_login_news": False,
"adult_flag": 2
},
"release_info": {},
"status_code": 200
}
x_message_sign = base64_encode(
rsa_sha1_sign(jsonlib.dumps(login_resp).encode(), get_rsa_privkey()))
logger.debug(f"{login_resp=} {x_message_sign=}")
return json(login_resp, headers={
"X-Message-Sign": x_message_sign
})
return login_resp

@ -0,0 +1,90 @@
from sanic import Blueprint, Request
import time
from .common import common_api
bp_sif_misc = Blueprint("sif_misc")
@bp_sif_misc.route("/gdpr/get", methods=["POST"])
@common_api
async def gdpr_get(request: Request):
gdpr_resp = {
"enable_gdpr": True,
"is_eea": False,
"server_timestamp": int(time.time())
}
return gdpr_resp
@bp_sif_misc.route("/lbonus/execute", methods=["POST"])
@common_api
async def lbonus_execute(request: Request):
lbdays = {
"day": 0,
"day_of_the_week": 1,
"special_day": False,
"special_image_asset": "",
"received": False,
"ad_received": False,
"item": {
"item_id": 0,
"add_type": 0,
"amount": 0
}
}
lbonus_resp = {
"sheets": [],
"calendar_info": {
"current_date": "2006-01-02 03:04:05",
"current_month": {
"year": 0,
"month": 0,
"days": [lbdays]
},
"next_month": {
"year": 0,
"month": 0,
"days": [lbdays]
}
},
"total_login_info": {
"login_count": 100,
"remaining_count": 100,
"reward": {
"item_id": 5,
"add_type": 1000,
"amount": 5,
}
},
"license_lbonus_list": [],
"class_system": {
"rank_info": {
"before_class_rank_id": 10,
"after_class_rank_id": 10,
"rank_up_date": "2020-02-12 11:57:15",
},
"complete_flag": False,
"is_opened": True,
"is_visible": True,
},
"start_dash_sheets": [],
"effort_point": {
"live_effort_point_box_spec_id": 0,
"capacity": 0,
"before": 0,
"after": 0,
"rewards": 0,
},
"limited_effort_box": [],
"museum_info": {},
"server_timestamp": int(time.time()),
"present_cnt": 0,
}
return lbonus_resp

@ -0,0 +1,53 @@
from sanic import Blueprint, Request
import time
from .common import common_api
bp_sif_user = Blueprint("sif_user", url_prefix="user")
@bp_sif_user.route("userInfo", methods=["POST"])
@common_api
async def userinfo(request: Request):
userinfo_resp = {
"user": {
"user_id": request.ctx.user_id,
"name": "wlt233",
"level": 100,
"exp": 100,
"previous_exp": 0,
"next_exp": 100,
"game_coin": 0,
"sns_coin": 0,
"free_sns_coin": 0,
"paid_sns_coin": 0,
"social_point": 0,
"unit_max": 100,
"waiting_unit_max": 100,
"energy_max": 100,
"energy_full_time": "2023-03-20 03:58:55",
"license_live_energy_recoverly_time": 100,
"energy_full_need_time": 0,
"over_max_energy": 0,
"training_energy": 100,
"training_energy_max": 0,
"friend_max": 100,
"invite_code": "",
"insert_date": "2023-03-20 03:58:55",
"update_date": "2023-03-20 03:58:55",
"tutorial_state": -1,
"diamond_coin": 0,
"crystal_coin": 0,
"lp_recovery_item": []
},
"birth": {
"birth_month": 1,
"birth_day": 1
},
"server_timestamp": int(time.time())
}
return userinfo_resp

@ -0,0 +1,59 @@
from sanic import Sanic
from sanic.log import logger
import os
import os.path
from collections import defaultdict
import json
def load_assets():
config = Sanic.get_app().config
if config["ASSETS_LIST_PATH"] and os.path.exists(config["ASSETS_LIST_PATH"]):
with open(config["ASSETS_LIST_PATH"], "r") as f:
assets_list = json.load(f,
object_hook=lambda d: {int(k) if k.lstrip('-').isdigit() else k: v for k, v in d.items()})
logger.info(f"Loaded packages list from {config['ASSETS_LIST_PATH']}")
else:
assets_list = { "Android": {}, "iOS": {} }
for i in [0, 1, 2, 3, 4, 5, 6, 99]:
assets_list["Android"][i] = defaultdict(list)
assets_list["iOS"][i] = defaultdict(list)
android_i = 0
if config["ASSETS_PATH_ANDROID"]:
for r, d, fs in os.walk(config["ASSETS_PATH_ANDROID"]):
for f in fs:
if f.endswith(".zip"):
package_type, package_id, package_order = [int(i) for i in f.replace(".zip", "").split("_")]
package_size = os.stat(os.path.join(r, f)).st_size
assets_list["Android"][package_type][package_id].append((package_order, package_size))
android_i += 1
logger.info(f"Loaded {android_i} Android packages from {config['ASSETS_PATH_ANDROID']}")
ios_i = 0
if config["ASSETS_PATH_IOS"]:
for r, d, fs in os.walk(config["ASSETS_PATH_IOS"]):
for f in fs:
if f.endswith(".zip"):
package_type, package_id, package_order = [int(i) for i in f.replace(".zip", "").split("_")]
package_size = os.stat(os.path.join(r, f)).st_size
assets_list["iOS"][package_type][package_id].append((package_order, package_size))
ios_i += 1
logger.info(f"Loaded {ios_i} iOS packages from {config['ASSETS_PATH_IOS']}")
if not os.path.exists("data"): os.makedirs("data")
with open("data/assets_list.json", "w") as f:
json.dump(assets_list, f)
Sanic.get_app().ctx.assets_list = assets_list
if config["ASSETS_PATH_ANDROID"]:
Sanic.get_app().static("/Android", config["ASSETS_PATH_ANDROID"])
logger.info(f"Hosted {config['ASSETS_PATH_ANDROID']} to {config['STATIC_ROOT']}/Android")
if config["ASSETS_PATH_IOS"]:
Sanic.get_app().static("/iOS", config["ASSETS_PATH_IOS"])
logger.info(f"Hosted {config['ASSETS_PATH_IOS']} to {config['STATIC_ROOT']}/iOS")

@ -44,7 +44,8 @@ def rsa_sha1_sign(message: bytes, private_key: str) -> bytes:
def des3_decrypt(cipher_text: bytes, key: bytes) -> bytes:
cipher = DES3.new(key, DES3.MODE_ECB)
return unpad(cipher.decrypt(cipher_text), 16)
return cipher.decrypt(cipher_text)
# return unpad(cipher.decrypt(cipher_text), 16)
def des3_encrypt(origin_text: bytes, key: bytes) -> bytes:
cipher = DES3.new(key, DES3.MODE_ECB)

Loading…
Cancel
Save