You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

79 lines
2.5 KiB

import asyncio
import json
import logging
import sys
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from loguru import logger
from quart import Quart, jsonify, request
from src.twitter import task_handler
app = Quart(__name__)
scheduler = AsyncIOScheduler()
logger.remove()
logger.add(sys.stdout, level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level}</level> | <cyan>{name}:{function}:{line}</cyan> - <level>{message}</level>")
class InterceptHandler(logging.Handler):
def emit(self, record):
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
logger.opt(depth=6, exception=record.exc_info).log(level, record.getMessage())
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logging.getLogger("apscheduler").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
#logger.disable("hypercorn")
@app.before_serving
async def startup():
with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f)
for task in config.get("task_list", []):
logger.info(f"Added task {task['name']} ({task['interval']}s): {task['url']}")
await add_task(task)
scheduler.start()
logger.info("Scheduler started")
@app.after_serving
async def shutdown():
scheduler.shutdown()
async def add_task(task_args):
if not "url" in task_args: raise ValueError("Task must have a URL")
task_name = task_args.get("name", task_args["url"])
interval = task_args.get("interval", 42)
task_args["filter"] = task_args.get("filter", [])
if scheduler.get_job(task_name):
scheduler.remove_job(task_name)
trigger = IntervalTrigger(seconds=interval)
scheduler.add_job(task_handler, trigger, args=[task_args],
id=task_name, replace_existing=True,
max_instances=3, misfire_grace_time=10)
async def list_task():
jobs = scheduler.get_jobs()
return [{"id": job.id, "next_run": str(job.next_run_time)} for job in jobs]
if __name__ == "__main__":
import asyncio
from hypercorn.asyncio import serve
from hypercorn.config import Config
config = Config()
config.bind = ["127.0.0.1:7217"]
config.accesslog = logging.getLogger('hypercorn.access')
config.errorlog = logging.getLogger('hypercorn.error')
asyncio.run(serve(app, config))
#app.run(port=7217)