import json import time from datetime import datetime from pprint import pprint # import pyotp import requests from loguru import logger from retry import retry from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait # def generate_authenticator_token(secret): # totp = pyotp.TOTP(secret) # return totp.now() def login(userid, username, password, authentication_secret=None): if not username or not password: return None try: options = webdriver.ChromeOptions() options.set_capability("goog:loggingPrefs", {"performance": "ALL"}) options.add_argument("--headless") driver = webdriver.Chrome(options=options) driver.get("https://x.com/i/flow/login") WebDriverWait(driver, 10).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]'))) username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]') username_field.send_keys(username) buttons = driver.find_elements(By.TAG_NAME, 'button') buttons[2].click() WebDriverWait(driver, 10).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]'))) username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]') username_field.send_keys(userid) buttons = driver.find_elements(By.TAG_NAME, 'button') buttons[1].click() WebDriverWait(driver, 10).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]'))) password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]') password_field.send_keys(password) login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]') login_button.click() # # 如果需要两步验证 # if authentication_secret: # WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[inputmode="numeric"]'))) # token = generate_authenticator_token(authentication_secret) # 需要实现的函数 # auth_field = driver.find_element(By.CSS_SELECTOR, 'input[inputmode="numeric"]') # auth_field.send_keys(token) # next_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="ocfEnterTextNextButton"]') # next_button.click() WebDriverWait(driver, 300).until(ec.url_contains('/home')) cookies = driver.get_cookies() cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies]) logger.success(f"Twitter login success for username {username}\n{cookie_string}") return driver except Exception as e: logger.error(f"Twitter login failed for username {username}: {e}") driver.quit() return None @retry(tries=10, delay=10) def get_timeline(driver, url): logger.info(f"check timeline {url}") driver.get(url) WebDriverWait(driver, 60).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]'))) for packet in driver.get_log("performance"): message = json.loads(packet["message"])["message"] if (message["method"] == "Network.responseReceived" and "ListLatestTweetsTimeline" in message["params"]["response"]["url"]): request_id = message["params"]["requestId"] resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id}) return json.loads(resp["body"]) return {} def parse_timeline(data): entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"] result = [] for entry in entries: result += parse_entry(entry) result.sort(key=lambda x: x["timestamp"], reverse=True) return result def parse_entry(entry): result = [] entry_id = entry["entryId"] if "list-conversation" in entry_id and not "tweet" in entry_id: for item in entry["content"]["items"]: result.append(parse_content(item["item"])) elif entry["content"]["__typename"] != 'TimelineTimelineCursor': result.append(parse_content(entry["content"])) return result def parse_content(content): tweet = content["itemContent"]["tweet_results"]["result"] data = parse_tweet(tweet) if "quoted_status_result" in tweet: data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"]) if "retweeted_status_result" in tweet["legacy"]: data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) return data def parse_media(media): data = { "url": media["media_url_https"] + "?name=orig", "video": "" } if media["type"] in ["video", "animated_gif"]: variants = [i for i in media["video_info"]["variants"] if "bitrate" in i] variants.sort(key=lambda x: x["bitrate"], reverse=True) if variants: data["video"] = variants[0]["url"] return data def parse_tweet(tweet): data = { "rest_id": tweet["rest_id"], "name": tweet["core"]["user_results"]["result"]["legacy"]["name"], "screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"], "profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"], "profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"], "full_text": tweet["legacy"]["full_text"], "created_at": tweet["legacy"]["created_at"], "timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()), "media": [], "quoted": {}, "retweeted": {} } for m in tweet["legacy"]["entities"].get("media", []): data["media"].append(parse_media(m)) return data LATEST_TWEET_ID_DICT = {} def check_new_tweets(tweets, url): global LATEST_TWEET_ID_DICT if url in LATEST_TWEET_ID_DICT: new_tweets = [] for tweet in tweets: if tweet["rest_id"] == LATEST_TWEET_ID_DICT[url]: LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"] return new_tweets new_tweets.append(tweet) LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"] return [] def check_timeline(driver, url): data = get_timeline(driver, url) tweets = parse_timeline(data) return check_new_tweets(tweets, url) def main(userid, username, password, config): driver = login(userid, username, password) while 1: json_data = {} for group_id, url in config.items(): new_tweets = check_timeline(driver, url) if new_tweets: json_data[group_id] = new_tweets if json_data: pprint(json_data) try: requests.post("http://localhost:8520/twitter", json=json_data) except Exception as e: logger.error(str(e)) time.sleep(55) if __name__ == "__main__": userid = "" username = "" password = "" config = { "": "https://x.com/i/lists/<...>", } main(userid, username, password, config) # with open("lovelive.json", 'r') as f: pprint(parse_timeline(json.load(f)))