diff --git a/twitter.py b/twitter.py index f623166..ee2bff1 100644 --- a/twitter.py +++ b/twitter.py @@ -4,80 +4,69 @@ import time from datetime import datetime from pprint import pprint -# import pyotp import requests from loguru import logger -from retry import retry from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait -# def generate_authenticator_token(secret): -# totp = pyotp.TOTP(secret) -# return totp.now() - -def login(userid, username, password, authentication_secret=None): - if not username or not password: +USERID, USERNAME, PASSWORD = "", "", "" +def login(): + global USERID, USERNAME, PASSWORD, DRIVER + if not USERID or not USERNAME or not PASSWORD: return None try: options = webdriver.ChromeOptions() options.set_capability("goog:loggingPrefs", {"performance": "ALL"}) - options.add_argument("--headless") + # options.add_argument("--headless") driver = webdriver.Chrome(options=options) driver.get("https://x.com/i/flow/login") WebDriverWait(driver, 10).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]'))) username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]') - username_field.send_keys(username) + username_field.send_keys(USERNAME) buttons = driver.find_elements(By.TAG_NAME, 'button') buttons[2].click() WebDriverWait(driver, 10).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]'))) username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]') - username_field.send_keys(userid) + username_field.send_keys(USERID) buttons = driver.find_elements(By.TAG_NAME, 'button') buttons[1].click() WebDriverWait(driver, 10).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]'))) password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]') - password_field.send_keys(password) + password_field.send_keys(PASSWORD) login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]') login_button.click() - # # 如果需要两步验证 - # if authentication_secret: - # WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[inputmode="numeric"]'))) - # token = generate_authenticator_token(authentication_secret) # 需要实现的函数 - # auth_field = driver.find_element(By.CSS_SELECTOR, 'input[inputmode="numeric"]') - # auth_field.send_keys(token) - # next_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="ocfEnterTextNextButton"]') - # next_button.click() - - WebDriverWait(driver, 300).until(ec.url_contains('/home')) + WebDriverWait(driver, 60).until(ec.url_contains('/home')) cookies = driver.get_cookies() cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies]) - logger.success(f"Twitter login success for username {username}\n{cookie_string}") + logger.success(f"Twitter login success for username {USERNAME}\n{cookie_string}") + DRIVER = driver return driver except Exception as e: - logger.error(f"Twitter login failed for username {username}: {e}") + logger.error(f"Twitter login failed for username {USERNAME}: {e}") driver.quit() return None - -# @retry(tries=10, delay=10) -def get_timeline(driver, url): +ERROR_COUNT = 0 +def get_timeline(url): + global ERROR_COUNT, DRIVER logger.info(f"check timeline {url}") try: + driver = DRIVER driver.get(url) - WebDriverWait(driver, 60).until( + WebDriverWait(driver, 30).until( ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]'))) for packet in driver.get_log("performance"): message = json.loads(packet["message"])["message"] @@ -86,9 +75,14 @@ def get_timeline(driver, url): request_id = message["params"]["requestId"] resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id}) logger.info(f"checked") + ERROR_COUNT = 0 return json.loads(resp["body"]) except Exception as e: logger.error(f"check failed: {e}") + ERROR_COUNT += 1 + if ERROR_COUNT > 5: + driver.quit() + login() return {} @@ -98,7 +92,10 @@ def parse_timeline(data): entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"] result = [] for entry in entries: - result += parse_entry(entry) + try: + result += parse_entry(entry) + except: + logger.error(f"error when parsing entry: {entry}") result.sort(key=lambda x: x["timestamp"], reverse=True) return result @@ -107,20 +104,26 @@ def parse_entry(entry): entry_id = entry["entryId"] if "list-conversation" in entry_id and not "tweet" in entry_id: for item in entry["content"]["items"]: - result.append(parse_content(item["item"])) + data = parse_content(item["item"]) + if data: result.append(data) elif entry["content"]["__typename"] != 'TimelineTimelineCursor': - result.append(parse_content(entry["content"])) + data = parse_content(entry["content"]) + if data: result.append(data) return result def parse_content(content): tweet = content["itemContent"]["tweet_results"]["result"] while not "rest_id" in tweet: tweet = tweet["tweet"] - data = parse_tweet(tweet) - if "quoted_status_result" in tweet: - data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"]) - if "retweeted_status_result" in tweet["legacy"]: - data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) - return data + try: + data = parse_tweet(tweet) + if "quoted_status_result" in tweet: + data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"]) + if "retweeted_status_result" in tweet["legacy"]: + data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) + return data + except: + logger.error(f"error when parsing tweet: {tweet}") + return {} def parse_media(media): data = { @@ -212,8 +215,8 @@ def filter_tweets(tweets, filter_list): return tweets -def check_timeline(driver, config): - data = get_timeline(driver, config["url"]) +def check_timeline(config): + data = get_timeline(config["url"]) if data: tweets = parse_timeline(data) new_tweets = check_new_tweets(tweets, config["url"]) @@ -225,10 +228,11 @@ def check_timeline(driver, config): def main(config): - userid = config["userid"] # screenid (@后面那个) - username = config["username"] # 登录用户名或邮箱 - password = config["password"] # 密码 - driver = login(userid, username, password) + global USERID, USERNAME, PASSWORD + USERID = config["userid"] # screenid (@后面那个) + USERNAME = config["username"] # 登录用户名或邮箱 + PASSWORD = config["password"] # 密码 + login() check_list = config.get("check_list", []) check_interval = config.get("check_interval", 42) @@ -242,7 +246,7 @@ def main(config): group_interval = group_config.get("interval", check_interval) if time.time() - last_check_time[group_id] > group_interval: - new_tweets = check_timeline(driver, group_config) + new_tweets = check_timeline(group_config) if new_tweets: json_data[group_id] = new_tweets last_check_time[group_id] = time.time() @@ -250,7 +254,8 @@ def main(config): if json_data: pprint(json_data) try: - requests.post(config["callback_url"], json=json_data) + resp = requests.post(config["callback_url"], json=json_data) + logger.info(resp.content) except Exception as e: logger.error(str(e))