feat: refact api (v2.0.0)

master
wlt233 2 days ago
parent 802d5c391c
commit 227bed961d

5
.gitignore vendored

@ -1,7 +1,6 @@
config.json
*.json
!config_template.json
chromedriver.exe
lovelive.json
# Byte-compiled / optimized / DLL files
__pycache__/

@ -1,8 +1,9 @@
{
"userid": "foobar123",
"username": "114514@1919.com",
"email": "114514@1919.com",
"password": "810810",
"callback_url": "http://localhost:114514/xxx",
"proxy": "socks5://localhost:7890",
"check_interval": 42,
"check_interval_slow": 600,
"slow_hours": [0, 1, 2, 3, 4, 5, 6],

@ -1,4 +1,4 @@
requests
httpx
loguru
retry
selenium

@ -0,0 +1,101 @@
import json
import traceback
import httpx
from loguru import logger
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
def login():
with open("config.json", "r", encoding="utf-8") as f:
config = json.load(f)
options = webdriver.ChromeOptions()
options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
#options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
try:
driver.set_page_load_timeout(30)
driver.get("https://x.com/i/flow/login")
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]')))
username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]')
username_field.send_keys(config["email"])
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[2].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]')))
userid_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]')
if not userid_field.get_attribute("value"):
userid_field.send_keys(config["userid"])
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[1].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]')))
password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]')
password_field.send_keys(config["password"])
login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]')
login_button.click()
WebDriverWait(driver, 60).until(ec.url_contains('/home'))
cookies = driver.get_cookies()
cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
logger.success(f"Twitter login success for username {config['email']}\n{cookie_string}")
driver.get("https://x.com/i/lists/205877981")
WebDriverWait(driver, 30).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]')))
logs = driver.get_log("performance")
#with open("log.json", "w", encoding="utf-8") as f: json.dump(logs, f, ensure_ascii=False, indent=4)
for packet in logs:
message = json.loads(packet["message"])["message"]
if (message["method"] == "Network.requestWillBeSentExtraInfo" and
":path" in message["params"]["headers"] and
"ListLatestTweetsTimeline" in message["params"]["headers"][":path"]):
headers = message["params"]["headers"]
headers = {k: v for k, v in headers.items() if k not in [":authority", ":method", ":path", ":scheme"]}
logger.success(f"Got request Headers: {headers}")
with open("headers.json", "w", encoding="utf-8") as f:
json.dump(headers, f, ensure_ascii=False, indent=4)
return headers
logger.error(f"Twitter login failed for username {config['email']}: No request found")
except Exception as e:
logger.error(f"Twitter login failed for username {config['email']}: {e}")
traceback.print_exc()
finally:
driver.quit()
def get_list(list_id):
logger.info(f"Check list https://x.com/i/lists/{list_id}")
with open("config.json", "r", encoding="utf-8") as f:
config = json.load(f)
with open("headers.json", "r", encoding="utf-8") as f:
headers = json.load(f)
headers["referer"] = f"https://x.com/i/lists/{list_id}"
params = {
'variables': '{"listId":"' + str(list_id) + '","count":20}',
'features': '{"rweb_video_screen_enabled":false,"profile_label_improvements_pcf_label_in_post_enabled":true,"rweb_tipjar_consumption_enabled":true,"verified_phone_label_enabled":false,"creator_subscriptions_tweet_preview_api_enabled":true,"responsive_web_graphql_timeline_navigation_enabled":true,"responsive_web_graphql_skip_user_profile_image_extensions_enabled":false,"premium_content_api_read_enabled":false,"communities_web_enable_tweet_community_results_fetch":true,"c9s_tweet_anatomy_moderator_badge_enabled":true,"responsive_web_grok_analyze_button_fetch_trends_enabled":false,"responsive_web_grok_analyze_post_followups_enabled":true,"responsive_web_jetfuel_frame":false,"responsive_web_grok_share_attachment_enabled":true,"articles_preview_enabled":true,"responsive_web_edit_tweet_api_enabled":true,"graphql_is_translatable_rweb_tweet_is_translatable_enabled":true,"view_counts_everywhere_api_enabled":true,"longform_notetweets_consumption_enabled":true,"responsive_web_twitter_article_tweet_consumption_enabled":true,"tweet_awards_web_tipping_enabled":false,"responsive_web_grok_show_grok_translated_post":false,"responsive_web_grok_analysis_button_from_backend":true,"creator_subscriptions_quote_tweet_preview_enabled":false,"freedom_of_speech_not_reach_fetch_enabled":true,"standardized_nudges_misinfo":true,"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled":true,"longform_notetweets_rich_text_read_enabled":true,"longform_notetweets_inline_media_enabled":true,"responsive_web_grok_image_annotation_enabled":true,"responsive_web_enhance_cards_enabled":false}',
}
resp = httpx.get(
'https://x.com/i/api/graphql/XYC5oRL-TmZ4zwomyY6T-g/ListLatestTweetsTimeline',
params=params,
headers=headers,
proxy=config["proxy"] if "proxy" in config else None,
)
if resp.status_code != 200:
logger.error(f"Error fetching list {list_id}: {resp.status_code} {resp.text}")
return None
logger.info(f"Checked {list_id}")
return resp.json()

@ -0,0 +1,135 @@
import json
from datetime import datetime
from loguru import logger
def parse_timeline(data):
entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"]
result = []
for entry in entries:
try:
result += parse_entry(entry)
except Exception as e:
logger.error(f"error when parsing entry: {e} {e.args}\n{entry}")
result.sort(key=lambda x: x["timestamp"], reverse=True)
return result
def parse_entry(entry):
result = []
entry_id = entry["entryId"]
if "list-conversation" in entry_id and not "tweet" in entry_id:
for item in entry["content"]["items"]:
data = parse_content(item["item"])
if data: result.append(data)
elif entry["content"]["__typename"] != 'TimelineTimelineCursor':
data = parse_content(entry["content"])
if data: result.append(data)
return result
def parse_content(content):
tweet = content["itemContent"]["tweet_results"]["result"]
while not "rest_id" in tweet: tweet = tweet["tweet"]
try:
data = parse_tweet(tweet)
if "quoted_status_result" in tweet:
data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"])
if "retweeted_status_result" in tweet["legacy"]:
data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"])
return data
except Exception as e:
logger.error(f"error when parsing tweet: {e} {e.args}\n{tweet}")
return {}
def parse_media(media):
data = {
"url": media["media_url_https"] + "?name=orig",
"video": ""
}
if media["type"] in ["video", "animated_gif"]:
variants = [i for i in media["video_info"]["variants"] if "bitrate" in i]
variants.sort(key=lambda x: x["bitrate"], reverse=True)
if variants: data["video"] = variants[0]["url"]
return data
def parse_entities(entity):
data = {
"text": "",
"indices": entity["indices"]
}
if "name" in entity: data["text"] = "@" + entity["name"]
if "text" in entity: data["text"] = "#" + entity["text"]
if "display_url" in entity: data["text"] = entity["display_url"]
return data
def parse_card(card):
data = {}
for v in card["legacy"]["binding_values"]:
if "choice" in v["key"] or v["key"] in [
"end_datetime_utc",
"unified_card",
"summary_photo_image_original"]:
value_name = f"{v['value']['type'].lower()}_value"
data[v["key"]] = v['value'].get(value_name, "")
photo = None
if "unified_card" in data:
card_data = json.loads(data["unified_card"])
del data["unified_card"]
try:
for k, v in card_data["media_entities"].items():
if "media_url_https" in v:
photo = {
"url": v["media_url_https"] + "?name=orig",
"video": ""
}
break
except:
logger.error(f"error parsing unified_card {card_data}")
if "summary_photo_image_original" in data:
photo = {
"url": data["summary_photo_image_original"]["url"],
"video": ""
}
del data["summary_photo_image_original"]
return data, photo
def parse_tweet(tweet):
# with open("tweet.json", "w") as f: json.dump(tweet, f)
while not "rest_id" in tweet: tweet = tweet["tweet"]
data = {
"rest_id": tweet["rest_id"],
"name": tweet["core"]["user_results"]["result"]["legacy"]["name"],
"screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"],
"profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"],
"profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"],
"full_text": tweet["legacy"]["full_text"],
"created_at": tweet["legacy"]["created_at"],
"timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()),
"reply_to": "",
"media": [],
"entities": [],
"quoted": {},
"retweeted": {},
"card": {}
}
data["profile_image"] = data["profile_image"].replace("_normal.", ".")
if "in_reply_to_status_id_str" in tweet["legacy"]:
data["reply_to"] = tweet["legacy"]["in_reply_to_status_id_str"]
for m in tweet["legacy"]["entities"].get("media", []):
data["media"].append(parse_media(m))
for e in ["user_mentions", "hashtags", "urls"]:
for m in tweet["legacy"]["entities"].get(e, []):
data["entities"].append(parse_entities(m))
data["entities"].sort(key=lambda x: x["indices"][0])
if "card" in tweet:
data["card"], _photo = parse_card(tweet["card"])
if _photo: data["media"].append(_photo)
return data

@ -1,4 +1,5 @@
import json
import os
import time
from collections import defaultdict
from datetime import datetime
@ -6,221 +7,9 @@ from pprint import pprint
import requests
from loguru import logger
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
USERID, USERNAME, PASSWORD = "", "", ""
def login():
global USERID, USERNAME, PASSWORD, DRIVER
if not USERID or not USERNAME or not PASSWORD:
return None
try:
options = webdriver.ChromeOptions()
options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.set_page_load_timeout(30)
driver.get("https://x.com/i/flow/login")
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]')))
username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]')
username_field.send_keys(USERNAME)
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[2].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]')))
userid_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]')
if not userid_field.get_attribute("value"):
userid_field.send_keys(USERID)
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[1].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]')))
password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]')
password_field.send_keys(PASSWORD)
login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]')
login_button.click()
WebDriverWait(driver, 60).until(ec.url_contains('/home'))
cookies = driver.get_cookies()
cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
logger.success(f"Twitter login success for username {USERNAME}\n{cookie_string}")
DRIVER = driver
return driver
except Exception as e:
logger.error(f"Twitter login failed for username {USERNAME}: {e}")
driver.quit()
return None
ERROR_COUNT = 0
def get_timeline(url):
global ERROR_COUNT, DRIVER
logger.info(f"check timeline {url}")
try:
driver = DRIVER
driver.get(url)
WebDriverWait(driver, 30).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]')))
for packet in driver.get_log("performance"):
message = json.loads(packet["message"])["message"]
if (message["method"] == "Network.responseReceived" and
"ListLatestTweetsTimeline" in message["params"]["response"]["url"]):
request_id = message["params"]["requestId"]
resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
logger.info(f"checked")
ERROR_COUNT = 0
return json.loads(resp["body"])
except Exception as e:
logger.error(f"check failed: {e}")
ERROR_COUNT += 1
if ERROR_COUNT > 5:
driver.quit()
login()
return {}
def parse_timeline(data):
entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"]
result = []
for entry in entries:
try:
result += parse_entry(entry)
except Exception as e:
logger.error(f"error when parsing entry: {e} {e.args}\n{entry}")
result.sort(key=lambda x: x["timestamp"], reverse=True)
return result
def parse_entry(entry):
result = []
entry_id = entry["entryId"]
if "list-conversation" in entry_id and not "tweet" in entry_id:
for item in entry["content"]["items"]:
data = parse_content(item["item"])
if data: result.append(data)
elif entry["content"]["__typename"] != 'TimelineTimelineCursor':
data = parse_content(entry["content"])
if data: result.append(data)
return result
def parse_content(content):
tweet = content["itemContent"]["tweet_results"]["result"]
while not "rest_id" in tweet: tweet = tweet["tweet"]
try:
data = parse_tweet(tweet)
if "quoted_status_result" in tweet:
data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"])
if "retweeted_status_result" in tweet["legacy"]:
data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"])
return data
except Exception as e:
logger.error(f"error when parsing tweet: {e} {e.args}\n{tweet}")
return {}
def parse_media(media):
data = {
"url": media["media_url_https"] + "?name=orig",
"video": ""
}
if media["type"] in ["video", "animated_gif"]:
variants = [i for i in media["video_info"]["variants"] if "bitrate" in i]
variants.sort(key=lambda x: x["bitrate"], reverse=True)
if variants: data["video"] = variants[0]["url"]
return data
def parse_entities(entity):
data = {
"text": "",
"indices": entity["indices"]
}
if "name" in entity: data["text"] = "@" + entity["name"]
if "text" in entity: data["text"] = "#" + entity["text"]
if "display_url" in entity: data["text"] = entity["display_url"]
return data
def parse_card(card):
data = {}
for v in card["legacy"]["binding_values"]:
if "choice" in v["key"] or v["key"] in [
"end_datetime_utc",
"unified_card",
"summary_photo_image_original"]:
value_name = f"{v['value']['type'].lower()}_value"
data[v["key"]] = v['value'].get(value_name, "")
photo = None
if "unified_card" in data:
card_data = json.loads(data["unified_card"])
del data["unified_card"]
try:
for k, v in card_data["media_entities"].items():
if "media_url_https" in v:
photo = {
"url": v["media_url_https"] + "?name=orig",
"video": ""
}
break
except:
logger.error(f"error parsing unified_card {card_data}")
if "summary_photo_image_original" in data:
photo = {
"url": data["summary_photo_image_original"]["url"],
"video": ""
}
del data["summary_photo_image_original"]
return data, photo
def parse_tweet(tweet):
# with open("tweet.json", "w") as f: json.dump(tweet, f)
while not "rest_id" in tweet: tweet = tweet["tweet"]
data = {
"rest_id": tweet["rest_id"],
"name": tweet["core"]["user_results"]["result"]["legacy"]["name"],
"screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"],
"profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"],
"profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"],
"full_text": tweet["legacy"]["full_text"],
"created_at": tweet["legacy"]["created_at"],
"timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()),
"reply_to": "",
"media": [],
"entities": [],
"quoted": {},
"retweeted": {},
"card": {}
}
data["profile_image"] = data["profile_image"].replace("_normal.", ".")
if "in_reply_to_status_id_str" in tweet["legacy"]:
data["reply_to"] = tweet["legacy"]["in_reply_to_status_id_str"]
for m in tweet["legacy"]["entities"].get("media", []):
data["media"].append(parse_media(m))
for e in ["user_mentions", "hashtags", "urls"]:
for m in tweet["legacy"]["entities"].get(e, []):
data["entities"].append(parse_entities(m))
data["entities"].sort(key=lambda x: x["indices"][0])
if "card" in tweet:
data["card"], _photo = parse_card(tweet["card"])
if _photo: data["media"].append(_photo)
return data
from twi_api import get_list, login
from twi_parser import parse_timeline
LATEST_TWEET_ID_DICT = {}
LATEST_TWEET_TS_DICT = {}
@ -253,7 +42,8 @@ def filter_tweets(tweets, filter_list):
return tweets
def check_timeline(config):
data = get_timeline(config["url"])
list_id = int(config["url"].split("/")[-1])
data = get_list(list_id)
if data:
tweets = parse_timeline(data)
new_tweets = check_new_tweets(tweets, config["url"])
@ -263,14 +53,12 @@ def check_timeline(config):
def main(config):
global USERID, USERNAME, PASSWORD
USERID = config["userid"] # screenid @后面那个)
USERNAME = config["username"] # 登录用户名或邮箱
PASSWORD = config["password"] # 密码
if __name__ == "__main__":
if not os.path.exists("headers.json"):
login()
with open("config.json", 'r') as f:
config = json.load(f)
check_list = config.get("check_list", [])
check_interval = config.get("check_interval", 42)
check_interval_slow = config.get("check_interval_slow", 600)
@ -302,9 +90,4 @@ def main(config):
else:
time.sleep(check_interval)
if __name__ == "__main__":
with open("config.json", 'r') as f:
config = json.load(f)
main(config)
# with open("lovelive.json", 'r', encoding="utf8") as f: pprint(parse_timeline(json.load(f)))

Loading…
Cancel
Save