commit 0debf35541770ca233030793f268b92aaf84f15e Author: wlt233 <1486185683@qq.com> Date: Thu Jan 2 11:16:03 2025 +0800 init (v1.0) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9676e2d --- /dev/null +++ b/.gitignore @@ -0,0 +1,111 @@ + +chromedriver.exe + + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# add +.idea/ \ No newline at end of file diff --git a/twitter.py b/twitter.py new file mode 100644 index 0000000..20a5e1b --- /dev/null +++ b/twitter.py @@ -0,0 +1,201 @@ +import json +import time +from datetime import datetime +from pprint import pprint + +# import pyotp +import requests +from loguru import logger +from retry import retry +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support import expected_conditions as ec +from selenium.webdriver.support.wait import WebDriverWait + + +# def generate_authenticator_token(secret): +# totp = pyotp.TOTP(secret) +# return totp.now() + +def login(userid, username, password, authentication_secret=None): + if not username or not password: + return None + + try: + options = webdriver.ChromeOptions() + options.set_capability("goog:loggingPrefs", {"performance": "ALL"}) + options.add_argument("--headless") + driver = webdriver.Chrome(options=options) + driver.get("https://x.com/i/flow/login") + + WebDriverWait(driver, 10).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]'))) + username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]') + username_field.send_keys(username) + buttons = driver.find_elements(By.TAG_NAME, 'button') + buttons[2].click() + + WebDriverWait(driver, 10).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]'))) + username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]') + username_field.send_keys(userid) + buttons = driver.find_elements(By.TAG_NAME, 'button') + buttons[1].click() + + WebDriverWait(driver, 10).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]'))) + password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]') + password_field.send_keys(password) + login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]') + login_button.click() + + # # 如果需要两步验证 + # if authentication_secret: + # WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[inputmode="numeric"]'))) + # token = generate_authenticator_token(authentication_secret) # 需要实现的函数 + # auth_field = driver.find_element(By.CSS_SELECTOR, 'input[inputmode="numeric"]') + # auth_field.send_keys(token) + # next_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="ocfEnterTextNextButton"]') + # next_button.click() + + WebDriverWait(driver, 300).until(ec.url_contains('/home')) + cookies = driver.get_cookies() + cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies]) + logger.success(f"Twitter login success for username {username}\n{cookie_string}") + return driver + + except Exception as e: + logger.error(f"Twitter login failed for username {username}: {e}") + driver.quit() + return None + + + +@retry(tries=10, delay=10) +def get_timeline(driver, url): + logger.info(f"check timeline {url}") + driver.get(url) + WebDriverWait(driver, 60).until( + ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]'))) + for packet in driver.get_log("performance"): + message = json.loads(packet["message"])["message"] + if (message["method"] == "Network.responseReceived" and + "ListLatestTweetsTimeline" in message["params"]["response"]["url"]): + request_id = message["params"]["requestId"] + resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id}) + return json.loads(resp["body"]) + return {} + + + + +def parse_timeline(data): + entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"] + result = [] + for entry in entries: + result += parse_entry(entry) + result.sort(key=lambda x: x["timestamp"], reverse=True) + return result + +def parse_entry(entry): + result = [] + entry_id = entry["entryId"] + if "list-conversation" in entry_id and not "tweet" in entry_id: + for item in entry["content"]["items"]: + result.append(parse_content(item["item"])) + elif entry["content"]["__typename"] != 'TimelineTimelineCursor': + result.append(parse_content(entry["content"])) + return result + +def parse_content(content): + tweet = content["itemContent"]["tweet_results"]["result"] + data = parse_tweet(tweet) + if "quoted_status_result" in tweet: + data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"]) + if "retweeted_status_result" in tweet["legacy"]: + data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) + return data + +def parse_media(media): + data = { + "url": media["media_url_https"] + "?name=orig", + "video": "" + } + if media["type"] in ["video", "animated_gif"]: + variants = [i for i in media["video_info"]["variants"] if "bitrate" in i] + variants.sort(key=lambda x: x["bitrate"], reverse=True) + if variants: data["video"] = variants[0]["url"] + return data + +def parse_tweet(tweet): + data = { + "rest_id": tweet["rest_id"], + "name": tweet["core"]["user_results"]["result"]["legacy"]["name"], + "screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"], + "profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"], + "profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"], + "full_text": tweet["legacy"]["full_text"], + "created_at": tweet["legacy"]["created_at"], + "timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()), + "media": [], + "quoted": {}, + "retweeted": {} + } + for m in tweet["legacy"]["entities"].get("media", []): + data["media"].append(parse_media(m)) + return data + + + + +LATEST_TWEET_ID_DICT = {} +def check_new_tweets(tweets, url): + global LATEST_TWEET_ID_DICT + + if url in LATEST_TWEET_ID_DICT: + new_tweets = [] + for tweet in tweets: + if tweet["rest_id"] == LATEST_TWEET_ID_DICT[url]: + LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"] + return new_tweets + new_tweets.append(tweet) + + LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"] + return [] + +def check_timeline(driver, url): + data = get_timeline(driver, url) + tweets = parse_timeline(data) + return check_new_tweets(tweets, url) + + + + +def main(userid, username, password, config): + driver = login(userid, username, password) + + while 1: + json_data = {} + for group_id, url in config.items(): + new_tweets = check_timeline(driver, url) + if new_tweets: + json_data[group_id] = new_tweets + + if json_data: + pprint(json_data) + try: + requests.post("http://localhost:8520/twitter", json=json_data) + except Exception as e: + logger.error(str(e)) + + time.sleep(55) + +if __name__ == "__main__": + userid = "" + username = "" + password = "" + config = { + "": "https://x.com/i/lists/<...>", + } + main(userid, username, password, config) + # with open("lovelive.json", 'r') as f: pprint(parse_timeline(json.load(f)))