import asyncio
import json
import logging
import sys
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from loguru import logger
from quart import Quart, jsonify, request
from src.twi_api import get_detail
from src.twitter import task_handler
from src.twi_parser import parse_detail
app = Quart(__name__)
scheduler = AsyncIOScheduler()
logger.remove()
logger.add(sys.stdout, level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} - {message}")
class InterceptHandler(logging.Handler):
def emit(self, record):
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
logger.opt(depth=6, exception=record.exc_info).log(level, record.getMessage())
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logging.getLogger("apscheduler").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
#logger.disable("hypercorn")
@app.before_serving
async def startup():
with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f)
for task in config.get("task_list", []):
logger.info(f"Added task {task['name']} ({task['interval']}s): {task['url']}")
await add_task(task)
scheduler.start()
logger.info("Scheduler started")
@app.after_serving
async def shutdown():
scheduler.shutdown()
async def add_task(task_args):
if not "url" in task_args: raise ValueError("Task must have a URL")
task_name = task_args.get("name", task_args["url"])
interval = task_args.get("interval", 42)
task_args["filter"] = task_args.get("filter", [])
if scheduler.get_job(task_name):
scheduler.remove_job(task_name)
trigger = IntervalTrigger(seconds=interval)
scheduler.add_job(task_handler, trigger, args=[task_args],
id=task_name, replace_existing=True,
max_instances=3, misfire_grace_time=10)
async def list_task():
jobs = scheduler.get_jobs()
return [{"id": job.id, "next_run": str(job.next_run_time)} for job in jobs]
@app.route("/tweet/detail", methods=["GET"])
async def tweet_detail():
tweet_id = request.args.get("tweet_id")
if not tweet_id:
return jsonify({"error": "tweet_id is required"}), 400
try:
data = await get_detail(tweet_id)
data = parse_detail(data)
return jsonify(data)
except Exception as e:
logger.error(f"Tweet detail api error: {e}")
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
import asyncio
from hypercorn.asyncio import serve
from hypercorn.config import Config
with open("./config/config.json", "r", encoding="utf-8") as f: conf = json.load(f)
config = Config()
config.bind = [conf["url"]]
config.accesslog = logging.getLogger('hypercorn.access')
config.errorlog = logging.getLogger('hypercorn.error')
asyncio.run(serve(app, config))
#app.run(port=7217)