from collections import defaultdict import json import os import pprint import time from datetime import datetime import httpx from loguru import logger from .twi_api import get_list, login from .twi_filter import filter_tweets from .twi_parser import parse_timeline LATEST_TWEET_ID_DICT = {} LATEST_TWEET_TS_DICT = {} def check_new_tweets(tweets, list_id, check_interval=600): if not tweets: return [] new_tweets = [] if list_id in LATEST_TWEET_ID_DICT: for tweet in tweets: if tweet["rest_id"] == LATEST_TWEET_ID_DICT[list_id]: break if tweet["timestamp"] <= LATEST_TWEET_TS_DICT[list_id]: break if time.time() - tweet["timestamp"] > check_interval * 3: break new_tweets.append(tweet) LATEST_TWEET_ID_DICT[list_id] = tweets[0]["rest_id"] if tweets[0]["timestamp"] > LATEST_TWEET_TS_DICT.get(list_id, 0): LATEST_TWEET_TS_DICT[list_id] = tweets[0]["timestamp"] return new_tweets LATEST_CHECK_TIME = defaultdict(float) async def task_handler(args): with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f) slow_interval = config.get("slow_interval", 600) slow_hours = config.get("slow_hours", [0, 1, 2, 3, 4, 5, 6]) if datetime.now().hour in slow_hours: if time.time() - LATEST_CHECK_TIME[args["name"]] < slow_interval: logger.info(f"Task {args['name']} skipped") return logger.info(f"Task args: {args}") if not os.path.exists("./config/headers.json"): logger.error("Headers not found, logging in to Twitter") login() LATEST_CHECK_TIME[args["name"]] = time.time() list_id = int(args["url"].split("/")[-1]) data = await get_list(list_id) if data: tweets = parse_timeline(data) new_tweets = check_new_tweets(tweets, list_id, config.get("check_interval", 600)) new_tweets = filter_tweets(new_tweets, args["filter"]) if new_tweets: json_data = { args["qq"]: new_tweets } logger.info("\n" + pprint.pformat(json_data)) try: async with httpx.AsyncClient() as client: resp = await client.post(config["callback_url"], json=json_data, timeout=10) logger.info(resp.content) except Exception as e: logger.error(e) logger.info(f"Task {args['name']} finished")