You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.1 KiB

import asyncio
import json
import logging
import sys
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from loguru import logger
from quart import Quart, jsonify, request
from src.twi_api import get_detail
from src.twitter import task_handler
from src.twi_parser import parse_detail
app = Quart(__name__)
scheduler = AsyncIOScheduler()
logger.remove()
logger.add(sys.stdout, level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level}</level> | <cyan>{name}:{function}:{line}</cyan> - <level>{message}</level>")
class InterceptHandler(logging.Handler):
def emit(self, record):
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
logger.opt(depth=6, exception=record.exc_info).log(level, record.getMessage())
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logging.getLogger("apscheduler").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
#logger.disable("hypercorn")
@app.before_serving
async def startup():
with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f)
for task in config.get("task_list", []):
logger.info(f"Added task {task['name']} ({task['interval']}s): {task['url']}")
await add_task(task)
scheduler.start()
logger.info("Scheduler started")
@app.after_serving
async def shutdown():
scheduler.shutdown()
async def add_task(task_args):
if not "url" in task_args: raise ValueError("Task must have a URL")
task_name = task_args.get("name", task_args["url"])
interval = task_args.get("interval", 42)
task_args["filter"] = task_args.get("filter", [])
if scheduler.get_job(task_name):
scheduler.remove_job(task_name)
trigger = IntervalTrigger(seconds=interval)
scheduler.add_job(task_handler, trigger, args=[task_args],
id=task_name, replace_existing=True,
max_instances=3, misfire_grace_time=10)
async def list_task():
jobs = scheduler.get_jobs()
return [{"id": job.id, "next_run": str(job.next_run_time)} for job in jobs]
@app.route("/tweet/detail", methods=["GET"])
async def tweet_detail():
tweet_id = request.args.get("tweet_id")
if not tweet_id:
return jsonify({"error": "tweet_id is required"}), 400
try:
data = await get_detail(tweet_id)
data = parse_detail(data)
return jsonify(data)
except Exception as e:
logger.error(f"Tweet detail api error: {e}")
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
import asyncio
from hypercorn.asyncio import serve
from hypercorn.config import Config
with open("./config/config.json", "r", encoding="utf-8") as f: conf = json.load(f)
config = Config()
config.bind = [conf["url"]]
config.accesslog = logging.getLogger('hypercorn.access')
config.errorlog = logging.getLogger('hypercorn.error')
asyncio.run(serve(app, config))
#app.run(port=7217)