From a5f06fc08c0eb63fe9b27a743dfb94c9aa312840 Mon Sep 17 00:00:00 2001 From: wlt233 <1486185683@qq.com> Date: Fri, 23 May 2025 17:19:00 +0800 Subject: [PATCH] feat: refact framework (v3.0.0) --- config/config_template.json | 21 +++++++ config_template.json | 20 ------- server.py | 76 ++++++++++++++++++++++++ twi_api.py => src/twi_api.py | 18 +++--- src/twi_filter.py | 11 ++++ twi_parser.py => src/twi_parser.py | 0 src/twitter.py | 68 ++++++++++++++++++++++ twitter.py | 93 ------------------------------ 8 files changed, 184 insertions(+), 123 deletions(-) create mode 100644 config/config_template.json delete mode 100644 config_template.json create mode 100644 server.py rename twi_api.py => src/twi_api.py (91%) create mode 100644 src/twi_filter.py rename twi_parser.py => src/twi_parser.py (100%) create mode 100644 src/twitter.py delete mode 100644 twitter.py diff --git a/config/config_template.json b/config/config_template.json new file mode 100644 index 0000000..3d89910 --- /dev/null +++ b/config/config_template.json @@ -0,0 +1,21 @@ +{ + "userid": "foobar123", + "email": "114514@1919.com", + "password": "810810", + + "callback_url": "http://localhost:1145", + "proxy": "socks5://localhost:7890", + + "slow_interval": 600, + "slow_hours": [1, 2, 3, 4, 5, 6], + + "task_list": [ + { + "name": "lovelive", + "qq": "1145141919", + "url": "https://x.com/i/lists/1873733459036000393", + "interval": 42, + "filter": [] + } + ] +} \ No newline at end of file diff --git a/config_template.json b/config_template.json deleted file mode 100644 index ddfc387..0000000 --- a/config_template.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "userid": "foobar123", - "email": "114514@1919.com", - "password": "810810", - "callback_url": "http://localhost:114514/xxx", - "proxy": "socks5://localhost:7890", - "check_interval": 42, - "check_interval_slow": 600, - "slow_hours": [0, 1, 2, 3, 4, 5, 6], - "check_list": { - "1145141919": { - "url": "https://x.com/i/lists/1873731145141919810", - "interval": 42, - "filter": [ - "only_image", - "only_origin" - ] - } - } -} \ No newline at end of file diff --git a/server.py b/server.py new file mode 100644 index 0000000..023592c --- /dev/null +++ b/server.py @@ -0,0 +1,76 @@ +import asyncio +import json +import logging +import sys + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.interval import IntervalTrigger +from loguru import logger +from quart import Quart, jsonify, request + +from src.twitter import task_handler + +app = Quart(__name__) +scheduler = AsyncIOScheduler() + +logger.remove() +logger.add(sys.stdout, level="INFO", + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} - {message}") +class InterceptHandler(logging.Handler): + def emit(self, record): + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + logger.opt(depth=6, exception=record.exc_info).log(level, record.getMessage()) +logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) +logging.getLogger("apscheduler").setLevel(logging.WARNING) +logging.getLogger("httpx").setLevel(logging.WARNING) +#logger.disable("hypercorn") + + + +@app.before_serving +async def startup(): + with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f) + for task in config.get("task_list", []): + logger.info(f"Added task {task['name']} ({task['interval']}s): {task['url']}") + await add_task(task) + scheduler.start() + logger.info("Scheduler started") + +@app.after_serving +async def shutdown(): + scheduler.shutdown() + + + +async def add_task(task_args): + if not "url" in task_args: raise ValueError("Task must have a URL") + task_name = task_args.get("name", task_args["url"]) + interval = task_args.get("interval", 42) + task_args["filter"] = task_args.get("filter", []) + + if scheduler.get_job(task_name): + scheduler.remove_job(task_name) + trigger = IntervalTrigger(seconds=interval) + scheduler.add_job(task_handler, trigger, args=[task_args], id=task_name, replace_existing=True) + +async def list_task(): + jobs = scheduler.get_jobs() + return [{"id": job.id, "next_run": str(job.next_run_time)} for job in jobs] + + + +if __name__ == "__main__": + import asyncio + + from hypercorn.asyncio import serve + from hypercorn.config import Config + + config = Config() + config.bind = ["127.0.0.1:7217"] + config.accesslog = logging.getLogger('hypercorn.access') + config.errorlog = logging.getLogger('hypercorn.error') + asyncio.run(serve(app, config)) + #app.run(port=7217) diff --git a/twi_api.py b/src/twi_api.py similarity index 91% rename from twi_api.py rename to src/twi_api.py index 4b30326..2712a39 100644 --- a/twi_api.py +++ b/src/twi_api.py @@ -12,8 +12,8 @@ from selenium.webdriver.support.wait import WebDriverWait def login(): - with open("config.json", "r", encoding="utf-8") as f: - config = json.load(f) + logger.info("Logging in to Twitter") + with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f) options = webdriver.ChromeOptions() options.set_capability("goog:loggingPrefs", {"performance": "ALL"}) @@ -65,7 +65,7 @@ def login(): headers = message["params"]["headers"] headers = {k: v for k, v in headers.items() if k not in [":authority", ":method", ":path", ":scheme"]} logger.success(f"Got request Headers: {headers}") - with open("headers.json", "w", encoding="utf-8") as f: + with open("./config/headers.json", "w", encoding="utf-8") as f: json.dump(headers, f, ensure_ascii=False, indent=4) return headers @@ -79,11 +79,9 @@ def login(): @retry(tries=3, delay=5) def get_list(list_id): - logger.info(f"Check list https://x.com/i/lists/{list_id}") - with open("config.json", "r", encoding="utf-8") as f: - config = json.load(f) - with open("headers.json", "r", encoding="utf-8") as f: - headers = json.load(f) + logger.info(f"Getting list https://x.com/i/lists/{list_id}") + with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f) + with open("./config/headers.json", "r", encoding="utf-8") as f: headers = json.load(f) headers["referer"] = f"https://x.com/i/lists/{list_id}" params = { @@ -98,7 +96,7 @@ def get_list(list_id): ) if resp.status_code != 200: logger.error(f"Error fetching list {list_id}: {resp.status_code} {resp.text}") - os.remove("headers.json") + os.remove("./config/headers.json") return None - logger.info(f"Checked {list_id}") + logger.info(f"Got {list_id}") return resp.json() diff --git a/src/twi_filter.py b/src/twi_filter.py new file mode 100644 index 0000000..82fdfd3 --- /dev/null +++ b/src/twi_filter.py @@ -0,0 +1,11 @@ + + +def filter_tweets(tweets, filter_list): + + if "only_image" in filter_list: + tweets = [t for t in tweets if t["media"]] + + if "only_origin" in filter_list: + tweets = [t for t in tweets if (not t["quoted"]) and (not t["retweeted"])] + + return tweets \ No newline at end of file diff --git a/twi_parser.py b/src/twi_parser.py similarity index 100% rename from twi_parser.py rename to src/twi_parser.py diff --git a/src/twitter.py b/src/twitter.py new file mode 100644 index 0000000..fd5fd83 --- /dev/null +++ b/src/twitter.py @@ -0,0 +1,68 @@ +from collections import defaultdict +import json +import os +import pprint +import time +from datetime import datetime + +import httpx +from loguru import logger + +from .twi_api import get_list, login +from .twi_filter import filter_tweets +from .twi_parser import parse_timeline + +LATEST_TWEET_ID_DICT = {} +LATEST_TWEET_TS_DICT = {} +def check_new_tweets(tweets, list_id): + if not tweets: return [] + new_tweets = [] + + if list_id in LATEST_TWEET_ID_DICT: + for tweet in tweets: + if tweet["rest_id"] == LATEST_TWEET_ID_DICT[list_id]: break + if tweet["timestamp"] < LATEST_TWEET_TS_DICT[list_id]: break + # if time.time() - tweet["timestamp"] > 1200: break + new_tweets.append(tweet) + + LATEST_TWEET_ID_DICT[list_id] = tweets[0]["rest_id"] + LATEST_TWEET_TS_DICT[list_id] = tweets[0]["timestamp"] + return new_tweets + + +LATEST_CHECK_TIME = defaultdict(float) +async def task_handler(args): + with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f) + slow_interval = config.get("slow_interval", 600) + slow_hours = config.get("slow_hours", [0, 1, 2, 3, 4, 5, 6]) + if datetime.now().hour in slow_hours: + if time.time() - LATEST_CHECK_TIME[args["name"]] < slow_interval: + logger.info(f"Task {args['name']} skipped") + return + + logger.info(f"Task args: {args}") + if not os.path.exists("./config/headers.json"): + logger.error("Headers not found, logging in to Twitter") + login() + LATEST_CHECK_TIME[args["name"]] = time.time() + + list_id = int(args["url"].split("/")[-1]) + data = get_list(list_id) + if data: + tweets = parse_timeline(data) + new_tweets = check_new_tweets(tweets, list_id) + new_tweets = filter_tweets(new_tweets, args["filter"]) + if new_tweets: + json_data = { args["qq"]: new_tweets } + logger.info("\n" + pprint.pformat(json_data)) + try: + async with httpx.AsyncClient() as client: + resp = await client.post(config["callback_url"], + json=json_data, timeout=10) + logger.info(resp.content) + except Exception as e: + logger.error(e) + + logger.info(f"Task {args['name']} finished") + + diff --git a/twitter.py b/twitter.py deleted file mode 100644 index 19a4ac5..0000000 --- a/twitter.py +++ /dev/null @@ -1,93 +0,0 @@ -import json -import os -import time -from collections import defaultdict -from datetime import datetime -from pprint import pprint - -import requests -from loguru import logger - -from twi_api import get_list, login -from twi_parser import parse_timeline - -LATEST_TWEET_ID_DICT = {} -LATEST_TWEET_TS_DICT = {} -def check_new_tweets(tweets, url): - global LATEST_TWEET_ID_DICT - - new_tweets = [] - if url in LATEST_TWEET_ID_DICT: - for tweet in tweets: - if tweet["rest_id"] == LATEST_TWEET_ID_DICT[url]: - break - if tweet["timestamp"] < LATEST_TWEET_TS_DICT[url]: - break - if time.time() - tweet["timestamp"] > 1200: - break - new_tweets.append(tweet) - - LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"] - LATEST_TWEET_TS_DICT[url] = tweets[0]["timestamp"] - return new_tweets - -def filter_tweets(tweets, filter_list): - - if "only_image" in filter_list: - tweets = [t for t in tweets if t["media"]] - - if "only_origin" in filter_list: - tweets = [t for t in tweets if (not t["quoted"]) and (not t["retweeted"])] - - return tweets - -def check_timeline(config): - list_id = int(config["url"].split("/")[-1]) - data = get_list(list_id) - if data: - tweets = parse_timeline(data) - new_tweets = check_new_tweets(tweets, config["url"]) - return filter_tweets(new_tweets, config["filter"]) - else: - return [] - - - -if __name__ == "__main__": - with open("config.json", 'r') as f: - config = json.load(f) - check_list = config.get("check_list", []) - check_interval = config.get("check_interval", 42) - check_interval_slow = config.get("check_interval_slow", 600) - slow_hours = config.get("slow_hours", [0, 1, 2, 3, 4, 5, 6]) - last_check_time = defaultdict(float) - - while 1: - if not os.path.exists("headers.json"): - login() - - json_data = {} - for group_id, group_config in check_list.items(): - group_interval = group_config.get("interval", check_interval) - - if time.time() - last_check_time[group_id] > group_interval: - new_tweets = check_timeline(group_config) - if new_tweets: - json_data[group_id] = new_tweets - last_check_time[group_id] = time.time() - - if json_data: - pprint(json_data) - try: - resp = requests.post(config["callback_url"], - json=json_data, timeout=10) - logger.info(resp.content) - except Exception as e: - logger.error(str(e)) - - if datetime.now().hour in slow_hours: - time.sleep(check_interval_slow) - else: - time.sleep(check_interval) - - # with open("lovelive.json", 'r', encoding="utf8") as f: pprint(parse_timeline(json.load(f)))