feat: refact framework (v3.0.0)

master
wlt233 2 weeks ago
parent 15f5f1f6fc
commit a5f06fc08c

@ -0,0 +1,21 @@
{
"userid": "foobar123",
"email": "114514@1919.com",
"password": "810810",
"callback_url": "http://localhost:1145",
"proxy": "socks5://localhost:7890",
"slow_interval": 600,
"slow_hours": [1, 2, 3, 4, 5, 6],
"task_list": [
{
"name": "lovelive",
"qq": "1145141919",
"url": "https://x.com/i/lists/1873733459036000393",
"interval": 42,
"filter": []
}
]
}

@ -1,20 +0,0 @@
{
"userid": "foobar123",
"email": "114514@1919.com",
"password": "810810",
"callback_url": "http://localhost:114514/xxx",
"proxy": "socks5://localhost:7890",
"check_interval": 42,
"check_interval_slow": 600,
"slow_hours": [0, 1, 2, 3, 4, 5, 6],
"check_list": {
"1145141919": {
"url": "https://x.com/i/lists/1873731145141919810",
"interval": 42,
"filter": [
"only_image",
"only_origin"
]
}
}
}

@ -0,0 +1,76 @@
import asyncio
import json
import logging
import sys
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from loguru import logger
from quart import Quart, jsonify, request
from src.twitter import task_handler
app = Quart(__name__)
scheduler = AsyncIOScheduler()
logger.remove()
logger.add(sys.stdout, level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level}</level> | <cyan>{name}:{function}:{line}</cyan> - <level>{message}</level>")
class InterceptHandler(logging.Handler):
def emit(self, record):
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
logger.opt(depth=6, exception=record.exc_info).log(level, record.getMessage())
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
logging.getLogger("apscheduler").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
#logger.disable("hypercorn")
@app.before_serving
async def startup():
with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f)
for task in config.get("task_list", []):
logger.info(f"Added task {task['name']} ({task['interval']}s): {task['url']}")
await add_task(task)
scheduler.start()
logger.info("Scheduler started")
@app.after_serving
async def shutdown():
scheduler.shutdown()
async def add_task(task_args):
if not "url" in task_args: raise ValueError("Task must have a URL")
task_name = task_args.get("name", task_args["url"])
interval = task_args.get("interval", 42)
task_args["filter"] = task_args.get("filter", [])
if scheduler.get_job(task_name):
scheduler.remove_job(task_name)
trigger = IntervalTrigger(seconds=interval)
scheduler.add_job(task_handler, trigger, args=[task_args], id=task_name, replace_existing=True)
async def list_task():
jobs = scheduler.get_jobs()
return [{"id": job.id, "next_run": str(job.next_run_time)} for job in jobs]
if __name__ == "__main__":
import asyncio
from hypercorn.asyncio import serve
from hypercorn.config import Config
config = Config()
config.bind = ["127.0.0.1:7217"]
config.accesslog = logging.getLogger('hypercorn.access')
config.errorlog = logging.getLogger('hypercorn.error')
asyncio.run(serve(app, config))
#app.run(port=7217)

@ -12,8 +12,8 @@ from selenium.webdriver.support.wait import WebDriverWait
def login():
with open("config.json", "r", encoding="utf-8") as f:
config = json.load(f)
logger.info("Logging in to Twitter")
with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f)
options = webdriver.ChromeOptions()
options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
@ -65,7 +65,7 @@ def login():
headers = message["params"]["headers"]
headers = {k: v for k, v in headers.items() if k not in [":authority", ":method", ":path", ":scheme"]}
logger.success(f"Got request Headers: {headers}")
with open("headers.json", "w", encoding="utf-8") as f:
with open("./config/headers.json", "w", encoding="utf-8") as f:
json.dump(headers, f, ensure_ascii=False, indent=4)
return headers
@ -79,11 +79,9 @@ def login():
@retry(tries=3, delay=5)
def get_list(list_id):
logger.info(f"Check list https://x.com/i/lists/{list_id}")
with open("config.json", "r", encoding="utf-8") as f:
config = json.load(f)
with open("headers.json", "r", encoding="utf-8") as f:
headers = json.load(f)
logger.info(f"Getting list https://x.com/i/lists/{list_id}")
with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f)
with open("./config/headers.json", "r", encoding="utf-8") as f: headers = json.load(f)
headers["referer"] = f"https://x.com/i/lists/{list_id}"
params = {
@ -98,7 +96,7 @@ def get_list(list_id):
)
if resp.status_code != 200:
logger.error(f"Error fetching list {list_id}: {resp.status_code} {resp.text}")
os.remove("headers.json")
os.remove("./config/headers.json")
return None
logger.info(f"Checked {list_id}")
logger.info(f"Got {list_id}")
return resp.json()

@ -0,0 +1,11 @@
def filter_tweets(tweets, filter_list):
if "only_image" in filter_list:
tweets = [t for t in tweets if t["media"]]
if "only_origin" in filter_list:
tweets = [t for t in tweets if (not t["quoted"]) and (not t["retweeted"])]
return tweets

@ -0,0 +1,68 @@
from collections import defaultdict
import json
import os
import pprint
import time
from datetime import datetime
import httpx
from loguru import logger
from .twi_api import get_list, login
from .twi_filter import filter_tweets
from .twi_parser import parse_timeline
LATEST_TWEET_ID_DICT = {}
LATEST_TWEET_TS_DICT = {}
def check_new_tweets(tweets, list_id):
if not tweets: return []
new_tweets = []
if list_id in LATEST_TWEET_ID_DICT:
for tweet in tweets:
if tweet["rest_id"] == LATEST_TWEET_ID_DICT[list_id]: break
if tweet["timestamp"] < LATEST_TWEET_TS_DICT[list_id]: break
# if time.time() - tweet["timestamp"] > 1200: break
new_tweets.append(tweet)
LATEST_TWEET_ID_DICT[list_id] = tweets[0]["rest_id"]
LATEST_TWEET_TS_DICT[list_id] = tweets[0]["timestamp"]
return new_tweets
LATEST_CHECK_TIME = defaultdict(float)
async def task_handler(args):
with open("./config/config.json", "r", encoding="utf-8") as f: config = json.load(f)
slow_interval = config.get("slow_interval", 600)
slow_hours = config.get("slow_hours", [0, 1, 2, 3, 4, 5, 6])
if datetime.now().hour in slow_hours:
if time.time() - LATEST_CHECK_TIME[args["name"]] < slow_interval:
logger.info(f"Task {args['name']} skipped")
return
logger.info(f"Task args: {args}")
if not os.path.exists("./config/headers.json"):
logger.error("Headers not found, logging in to Twitter")
login()
LATEST_CHECK_TIME[args["name"]] = time.time()
list_id = int(args["url"].split("/")[-1])
data = get_list(list_id)
if data:
tweets = parse_timeline(data)
new_tweets = check_new_tweets(tweets, list_id)
new_tweets = filter_tweets(new_tweets, args["filter"])
if new_tweets:
json_data = { args["qq"]: new_tweets }
logger.info("\n" + pprint.pformat(json_data))
try:
async with httpx.AsyncClient() as client:
resp = await client.post(config["callback_url"],
json=json_data, timeout=10)
logger.info(resp.content)
except Exception as e:
logger.error(e)
logger.info(f"Task {args['name']} finished")

@ -1,93 +0,0 @@
import json
import os
import time
from collections import defaultdict
from datetime import datetime
from pprint import pprint
import requests
from loguru import logger
from twi_api import get_list, login
from twi_parser import parse_timeline
LATEST_TWEET_ID_DICT = {}
LATEST_TWEET_TS_DICT = {}
def check_new_tweets(tweets, url):
global LATEST_TWEET_ID_DICT
new_tweets = []
if url in LATEST_TWEET_ID_DICT:
for tweet in tweets:
if tweet["rest_id"] == LATEST_TWEET_ID_DICT[url]:
break
if tweet["timestamp"] < LATEST_TWEET_TS_DICT[url]:
break
if time.time() - tweet["timestamp"] > 1200:
break
new_tweets.append(tweet)
LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"]
LATEST_TWEET_TS_DICT[url] = tweets[0]["timestamp"]
return new_tweets
def filter_tweets(tweets, filter_list):
if "only_image" in filter_list:
tweets = [t for t in tweets if t["media"]]
if "only_origin" in filter_list:
tweets = [t for t in tweets if (not t["quoted"]) and (not t["retweeted"])]
return tweets
def check_timeline(config):
list_id = int(config["url"].split("/")[-1])
data = get_list(list_id)
if data:
tweets = parse_timeline(data)
new_tweets = check_new_tweets(tweets, config["url"])
return filter_tweets(new_tweets, config["filter"])
else:
return []
if __name__ == "__main__":
with open("config.json", 'r') as f:
config = json.load(f)
check_list = config.get("check_list", [])
check_interval = config.get("check_interval", 42)
check_interval_slow = config.get("check_interval_slow", 600)
slow_hours = config.get("slow_hours", [0, 1, 2, 3, 4, 5, 6])
last_check_time = defaultdict(float)
while 1:
if not os.path.exists("headers.json"):
login()
json_data = {}
for group_id, group_config in check_list.items():
group_interval = group_config.get("interval", check_interval)
if time.time() - last_check_time[group_id] > group_interval:
new_tweets = check_timeline(group_config)
if new_tweets:
json_data[group_id] = new_tweets
last_check_time[group_id] = time.time()
if json_data:
pprint(json_data)
try:
resp = requests.post(config["callback_url"],
json=json_data, timeout=10)
logger.info(resp.content)
except Exception as e:
logger.error(str(e))
if datetime.now().hour in slow_hours:
time.sleep(check_interval_slow)
else:
time.sleep(check_interval)
# with open("lovelive.json", 'r', encoding="utf8") as f: pprint(parse_timeline(json.load(f)))
Loading…
Cancel
Save