From 227bed961de4e55454b24dd3532887dccf67d567 Mon Sep 17 00:00:00 2001 From: wlt233 <1486185683@qq.com> Date: Fri, 16 May 2025 18:04:08 +0800 Subject: [PATCH] feat: refact api (v2.0.0) --- .gitignore | 5 +- config_template.json | 3 +- requirements.txt | 2 +- twi_api.py | 101 ++++++++++++++++++ twi_parser.py | 135 ++++++++++++++++++++++++ twitter.py | 237 ++----------------------------------------- 6 files changed, 251 insertions(+), 232 deletions(-) create mode 100644 twi_api.py create mode 100644 twi_parser.py diff --git a/.gitignore b/.gitignore index 350f9ee..b661c87 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ -config.json +*.json +!config_template.json chromedriver.exe -lovelive.json - # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/config_template.json b/config_template.json index 0339daa..ddfc387 100644 --- a/config_template.json +++ b/config_template.json @@ -1,8 +1,9 @@ { "userid": "foobar123", - "username": "114514@1919.com", + "email": "114514@1919.com", "password": "810810", "callback_url": "http://localhost:114514/xxx", + "proxy": "socks5://localhost:7890", "check_interval": 42, "check_interval_slow": 600, "slow_hours": [0, 1, 2, 3, 4, 5, 6], diff --git a/requirements.txt b/requirements.txt index fa9c7ab..6952277 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -requests +httpx loguru retry selenium \ No newline at end of file diff --git a/twi_api.py b/twi_api.py new file mode 100644 index 0000000..d76140b --- /dev/null +++ b/twi_api.py @@ -0,0 +1,101 @@ +import json +import traceback + +import httpx +from loguru import logger +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as ec +from selenium.webdriver.support.wait import WebDriverWait + + +def login(): + with open("config.json", "r", encoding="utf-8") as f: + config = json.load(f) + + options = webdriver.ChromeOptions() + options.set_capability("goog:loggingPrefs", {"performance": "ALL"}) + #options.add_argument("--headless") + driver = webdriver.Chrome(options=options) + + try: + driver.set_page_load_timeout(30) + driver.get("https://x.com/i/flow/login") + + WebDriverWait(driver, 10).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]'))) + username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]') + username_field.send_keys(config["email"]) + buttons = driver.find_elements(By.TAG_NAME, 'button') + buttons[2].click() + + WebDriverWait(driver, 10).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]'))) + userid_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]') + if not userid_field.get_attribute("value"): + userid_field.send_keys(config["userid"]) + buttons = driver.find_elements(By.TAG_NAME, 'button') + buttons[1].click() + + WebDriverWait(driver, 10).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]'))) + password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]') + password_field.send_keys(config["password"]) + login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]') + login_button.click() + + WebDriverWait(driver, 60).until(ec.url_contains('/home')) + cookies = driver.get_cookies() + cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies]) + logger.success(f"Twitter login success for username {config['email']}\n{cookie_string}") + + driver.get("https://x.com/i/lists/205877981") + WebDriverWait(driver, 30).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]'))) + + logs = driver.get_log("performance") + #with open("log.json", "w", encoding="utf-8") as f: json.dump(logs, f, ensure_ascii=False, indent=4) + for packet in logs: + message = json.loads(packet["message"])["message"] + if (message["method"] == "Network.requestWillBeSentExtraInfo" and + ":path" in message["params"]["headers"] and + "ListLatestTweetsTimeline" in message["params"]["headers"][":path"]): + headers = message["params"]["headers"] + headers = {k: v for k, v in headers.items() if k not in [":authority", ":method", ":path", ":scheme"]} + logger.success(f"Got request Headers: {headers}") + with open("headers.json", "w", encoding="utf-8") as f: + json.dump(headers, f, ensure_ascii=False, indent=4) + return headers + + logger.error(f"Twitter login failed for username {config['email']}: No request found") + except Exception as e: + logger.error(f"Twitter login failed for username {config['email']}: {e}") + traceback.print_exc() + finally: + driver.quit() + + + +def get_list(list_id): + logger.info(f"Check list https://x.com/i/lists/{list_id}") + with open("config.json", "r", encoding="utf-8") as f: + config = json.load(f) + with open("headers.json", "r", encoding="utf-8") as f: + headers = json.load(f) + + headers["referer"] = f"https://x.com/i/lists/{list_id}" + params = { + 'variables': '{"listId":"' + str(list_id) + '","count":20}', + 'features': '{"rweb_video_screen_enabled":false,"profile_label_improvements_pcf_label_in_post_enabled":true,"rweb_tipjar_consumption_enabled":true,"verified_phone_label_enabled":false,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_timeline_navigation_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"premium_content_api_read_enabled":false,"communities_web_enable_tweet_community_results_fetch":true,"c9s_tweet_anatomy_moderator_badge_enabled":true,"responsive_web_grok_analyze_button_fetch_trends_enabled":false,"responsive_web_grok_analyze_post_followups_enabled":true,"responsive_web_jetfuel_frame":false,"responsive_web_grok_share_attachment_enabled":true,"articles_preview_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":true,"view_counts_everywhere_api_enabled":true,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"responsive_web_grok_show_grok_translated_post":false,"responsive_web_grok_analysis_button_from_backend":true,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"responsive_web_grok_image_annotation_enabled":true,"responsive_web_enhance_cards_enabled":false}', + } + resp = httpx.get( + 'https://x.com/i/api/graphql/XYC5oRL-TmZ4zwomyY6T-g/ListLatestTweetsTimeline', + params=params, + headers=headers, + proxy=config["proxy"] if "proxy" in config else None, + ) + if resp.status_code != 200: + logger.error(f"Error fetching list {list_id}: {resp.status_code} {resp.text}") + return None + logger.info(f"Checked {list_id}") + return resp.json() diff --git a/twi_parser.py b/twi_parser.py new file mode 100644 index 0000000..db41e01 --- /dev/null +++ b/twi_parser.py @@ -0,0 +1,135 @@ +import json +from datetime import datetime + +from loguru import logger + + +def parse_timeline(data): + entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"] + result = [] + for entry in entries: + try: + result += parse_entry(entry) + except Exception as e: + logger.error(f"error when parsing entry: {e} {e.args}\n{entry}") + result.sort(key=lambda x: x["timestamp"], reverse=True) + return result + +def parse_entry(entry): + result = [] + entry_id = entry["entryId"] + if "list-conversation" in entry_id and not "tweet" in entry_id: + for item in entry["content"]["items"]: + data = parse_content(item["item"]) + if data: result.append(data) + elif entry["content"]["__typename"] != 'TimelineTimelineCursor': + data = parse_content(entry["content"]) + if data: result.append(data) + return result + +def parse_content(content): + tweet = content["itemContent"]["tweet_results"]["result"] + while not "rest_id" in tweet: tweet = tweet["tweet"] + try: + data = parse_tweet(tweet) + if "quoted_status_result" in tweet: + data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"]) + if "retweeted_status_result" in tweet["legacy"]: + data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) + return data + except Exception as e: + logger.error(f"error when parsing tweet: {e} {e.args}\n{tweet}") + return {} + +def parse_media(media): + data = { + "url": media["media_url_https"] + "?name=orig", + "video": "" + } + if media["type"] in ["video", "animated_gif"]: + variants = [i for i in media["video_info"]["variants"] if "bitrate" in i] + variants.sort(key=lambda x: x["bitrate"], reverse=True) + if variants: data["video"] = variants[0]["url"] + return data + +def parse_entities(entity): + data = { + "text": "", + "indices": entity["indices"] + } + if "name" in entity: data["text"] = "@" + entity["name"] + if "text" in entity: data["text"] = "#" + entity["text"] + if "display_url" in entity: data["text"] = entity["display_url"] + return data + +def parse_card(card): + data = {} + for v in card["legacy"]["binding_values"]: + if "choice" in v["key"] or v["key"] in [ + "end_datetime_utc", + "unified_card", + "summary_photo_image_original"]: + value_name = f"{v['value']['type'].lower()}_value" + data[v["key"]] = v['value'].get(value_name, "") + + photo = None + if "unified_card" in data: + card_data = json.loads(data["unified_card"]) + del data["unified_card"] + try: + for k, v in card_data["media_entities"].items(): + if "media_url_https" in v: + photo = { + "url": v["media_url_https"] + "?name=orig", + "video": "" + } + break + except: + logger.error(f"error parsing unified_card {card_data}") + + if "summary_photo_image_original" in data: + photo = { + "url": data["summary_photo_image_original"]["url"], + "video": "" + } + del data["summary_photo_image_original"] + + return data, photo + +def parse_tweet(tweet): + # with open("tweet.json", "w") as f: json.dump(tweet, f) + while not "rest_id" in tweet: tweet = tweet["tweet"] + data = { + "rest_id": tweet["rest_id"], + "name": tweet["core"]["user_results"]["result"]["legacy"]["name"], + "screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"], + "profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"], + "profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"], + "full_text": tweet["legacy"]["full_text"], + "created_at": tweet["legacy"]["created_at"], + "timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()), + "reply_to": "", + "media": [], + "entities": [], + "quoted": {}, + "retweeted": {}, + "card": {} + } + data["profile_image"] = data["profile_image"].replace("_normal.", ".") + + if "in_reply_to_status_id_str" in tweet["legacy"]: + data["reply_to"] = tweet["legacy"]["in_reply_to_status_id_str"] + + for m in tweet["legacy"]["entities"].get("media", []): + data["media"].append(parse_media(m)) + + for e in ["user_mentions", "hashtags", "urls"]: + for m in tweet["legacy"]["entities"].get(e, []): + data["entities"].append(parse_entities(m)) + data["entities"].sort(key=lambda x: x["indices"][0]) + + if "card" in tweet: + data["card"], _photo = parse_card(tweet["card"]) + if _photo: data["media"].append(_photo) + + return data \ No newline at end of file diff --git a/twitter.py b/twitter.py index 66a7042..9a4bf42 100644 --- a/twitter.py +++ b/twitter.py @@ -1,4 +1,5 @@ import json +import os import time from collections import defaultdict from datetime import datetime @@ -6,221 +7,9 @@ from pprint import pprint import requests from loguru import logger -from selenium import webdriver -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as ec -from selenium.webdriver.support.wait import WebDriverWait - -USERID, USERNAME, PASSWORD = "", "", "" -def login(): - global USERID, USERNAME, PASSWORD, DRIVER - if not USERID or not USERNAME or not PASSWORD: - return None - - try: - options = webdriver.ChromeOptions() - options.set_capability("goog:loggingPrefs", {"performance": "ALL"}) - options.add_argument("--headless") - driver = webdriver.Chrome(options=options) - driver.set_page_load_timeout(30) - driver.get("https://x.com/i/flow/login") - - WebDriverWait(driver, 10).until( - ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]'))) - username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]') - username_field.send_keys(USERNAME) - buttons = driver.find_elements(By.TAG_NAME, 'button') - buttons[2].click() - - WebDriverWait(driver, 10).until( - ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]'))) - userid_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]') - if not userid_field.get_attribute("value"): - userid_field.send_keys(USERID) - buttons = driver.find_elements(By.TAG_NAME, 'button') - buttons[1].click() - - WebDriverWait(driver, 10).until( - ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]'))) - password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]') - password_field.send_keys(PASSWORD) - login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]') - login_button.click() - - WebDriverWait(driver, 60).until(ec.url_contains('/home')) - cookies = driver.get_cookies() - cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies]) - logger.success(f"Twitter login success for username {USERNAME}\n{cookie_string}") - DRIVER = driver - return driver - - except Exception as e: - logger.error(f"Twitter login failed for username {USERNAME}: {e}") - driver.quit() - return None - - -ERROR_COUNT = 0 -def get_timeline(url): - global ERROR_COUNT, DRIVER - logger.info(f"check timeline {url}") - try: - driver = DRIVER - driver.get(url) - WebDriverWait(driver, 30).until( - ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]'))) - for packet in driver.get_log("performance"): - message = json.loads(packet["message"])["message"] - if (message["method"] == "Network.responseReceived" and - "ListLatestTweetsTimeline" in message["params"]["response"]["url"]): - request_id = message["params"]["requestId"] - resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id}) - logger.info(f"checked") - ERROR_COUNT = 0 - return json.loads(resp["body"]) - except Exception as e: - logger.error(f"check failed: {e}") - ERROR_COUNT += 1 - if ERROR_COUNT > 5: - driver.quit() - login() - return {} - - - - -def parse_timeline(data): - entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"] - result = [] - for entry in entries: - try: - result += parse_entry(entry) - except Exception as e: - logger.error(f"error when parsing entry: {e} {e.args}\n{entry}") - result.sort(key=lambda x: x["timestamp"], reverse=True) - return result - -def parse_entry(entry): - result = [] - entry_id = entry["entryId"] - if "list-conversation" in entry_id and not "tweet" in entry_id: - for item in entry["content"]["items"]: - data = parse_content(item["item"]) - if data: result.append(data) - elif entry["content"]["__typename"] != 'TimelineTimelineCursor': - data = parse_content(entry["content"]) - if data: result.append(data) - return result - -def parse_content(content): - tweet = content["itemContent"]["tweet_results"]["result"] - while not "rest_id" in tweet: tweet = tweet["tweet"] - try: - data = parse_tweet(tweet) - if "quoted_status_result" in tweet: - data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"]) - if "retweeted_status_result" in tweet["legacy"]: - data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) - return data - except Exception as e: - logger.error(f"error when parsing tweet: {e} {e.args}\n{tweet}") - return {} - -def parse_media(media): - data = { - "url": media["media_url_https"] + "?name=orig", - "video": "" - } - if media["type"] in ["video", "animated_gif"]: - variants = [i for i in media["video_info"]["variants"] if "bitrate" in i] - variants.sort(key=lambda x: x["bitrate"], reverse=True) - if variants: data["video"] = variants[0]["url"] - return data - -def parse_entities(entity): - data = { - "text": "", - "indices": entity["indices"] - } - if "name" in entity: data["text"] = "@" + entity["name"] - if "text" in entity: data["text"] = "#" + entity["text"] - if "display_url" in entity: data["text"] = entity["display_url"] - return data - -def parse_card(card): - data = {} - for v in card["legacy"]["binding_values"]: - if "choice" in v["key"] or v["key"] in [ - "end_datetime_utc", - "unified_card", - "summary_photo_image_original"]: - value_name = f"{v['value']['type'].lower()}_value" - data[v["key"]] = v['value'].get(value_name, "") - - photo = None - if "unified_card" in data: - card_data = json.loads(data["unified_card"]) - del data["unified_card"] - try: - for k, v in card_data["media_entities"].items(): - if "media_url_https" in v: - photo = { - "url": v["media_url_https"] + "?name=orig", - "video": "" - } - break - except: - logger.error(f"error parsing unified_card {card_data}") - - if "summary_photo_image_original" in data: - photo = { - "url": data["summary_photo_image_original"]["url"], - "video": "" - } - del data["summary_photo_image_original"] - - return data, photo - -def parse_tweet(tweet): - # with open("tweet.json", "w") as f: json.dump(tweet, f) - while not "rest_id" in tweet: tweet = tweet["tweet"] - data = { - "rest_id": tweet["rest_id"], - "name": tweet["core"]["user_results"]["result"]["legacy"]["name"], - "screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"], - "profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"], - "profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"], - "full_text": tweet["legacy"]["full_text"], - "created_at": tweet["legacy"]["created_at"], - "timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()), - "reply_to": "", - "media": [], - "entities": [], - "quoted": {}, - "retweeted": {}, - "card": {} - } - data["profile_image"] = data["profile_image"].replace("_normal.", ".") - - if "in_reply_to_status_id_str" in tweet["legacy"]: - data["reply_to"] = tweet["legacy"]["in_reply_to_status_id_str"] - - for m in tweet["legacy"]["entities"].get("media", []): - data["media"].append(parse_media(m)) - - for e in ["user_mentions", "hashtags", "urls"]: - for m in tweet["legacy"]["entities"].get(e, []): - data["entities"].append(parse_entities(m)) - data["entities"].sort(key=lambda x: x["indices"][0]) - - if "card" in tweet: - data["card"], _photo = parse_card(tweet["card"]) - if _photo: data["media"].append(_photo) - - return data - - +from twi_api import get_list, login +from twi_parser import parse_timeline LATEST_TWEET_ID_DICT = {} LATEST_TWEET_TS_DICT = {} @@ -253,7 +42,8 @@ def filter_tweets(tweets, filter_list): return tweets def check_timeline(config): - data = get_timeline(config["url"]) + list_id = int(config["url"].split("/")[-1]) + data = get_list(list_id) if data: tweets = parse_timeline(data) new_tweets = check_new_tweets(tweets, config["url"]) @@ -263,14 +53,12 @@ def check_timeline(config): - -def main(config): - global USERID, USERNAME, PASSWORD - USERID = config["userid"] # screenid (@后面那个) - USERNAME = config["username"] # 登录用户名或邮箱 - PASSWORD = config["password"] # 密码 - login() +if __name__ == "__main__": + if not os.path.exists("headers.json"): + login() + with open("config.json", 'r') as f: + config = json.load(f) check_list = config.get("check_list", []) check_interval = config.get("check_interval", 42) check_interval_slow = config.get("check_interval_slow", 600) @@ -301,10 +89,5 @@ def main(config): time.sleep(check_interval_slow) else: time.sleep(check_interval) - -if __name__ == "__main__": - with open("config.json", 'r') as f: - config = json.load(f) - main(config) # with open("lovelive.json", 'r', encoding="utf8") as f: pprint(parse_timeline(json.load(f)))