You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

232 lines
8.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import time
from datetime import datetime
from pprint import pprint
# import pyotp
import requests
from loguru import logger
from retry import retry
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
# def generate_authenticator_token(secret):
# totp = pyotp.TOTP(secret)
# return totp.now()
def login(userid, username, password, authentication_secret=None):
if not username or not password:
return None
try:
options = webdriver.ChromeOptions()
options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
options.add_argument("--headless")
driver = webdriver.Chrome(options=options)
driver.get("https://x.com/i/flow/login")
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]')))
username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]')
username_field.send_keys(username)
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[2].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]')))
username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]')
username_field.send_keys(userid)
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[1].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]')))
password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]')
password_field.send_keys(password)
login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]')
login_button.click()
# # 如果需要两步验证
# if authentication_secret:
# WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[inputmode="numeric"]')))
# token = generate_authenticator_token(authentication_secret) # 需要实现的函数
# auth_field = driver.find_element(By.CSS_SELECTOR, 'input[inputmode="numeric"]')
# auth_field.send_keys(token)
# next_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="ocfEnterTextNextButton"]')
# next_button.click()
WebDriverWait(driver, 300).until(ec.url_contains('/home'))
cookies = driver.get_cookies()
cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
logger.success(f"Twitter login success for username {username}\n{cookie_string}")
return driver
except Exception as e:
logger.error(f"Twitter login failed for username {username}: {e}")
driver.quit()
return None
@retry(tries=10, delay=10)
def get_timeline(driver, url):
logger.info(f"check timeline {url}")
driver.get(url)
WebDriverWait(driver, 60).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]')))
for packet in driver.get_log("performance"):
message = json.loads(packet["message"])["message"]
if (message["method"] == "Network.responseReceived" and
"ListLatestTweetsTimeline" in message["params"]["response"]["url"]):
request_id = message["params"]["requestId"]
resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
return json.loads(resp["body"])
return {}
def parse_timeline(data):
entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"]
result = []
for entry in entries:
result += parse_entry(entry)
result.sort(key=lambda x: x["timestamp"], reverse=True)
return result
def parse_entry(entry):
result = []
entry_id = entry["entryId"]
if "list-conversation" in entry_id and not "tweet" in entry_id:
for item in entry["content"]["items"]:
result.append(parse_content(item["item"]))
elif entry["content"]["__typename"] != 'TimelineTimelineCursor':
result.append(parse_content(entry["content"]))
return result
def parse_content(content):
tweet = content["itemContent"]["tweet_results"]["result"]
data = parse_tweet(tweet)
if "quoted_status_result" in tweet:
data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"])
if "retweeted_status_result" in tweet["legacy"]:
data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"])
return data
def parse_media(media):
data = {
"url": media["media_url_https"] + "?name=orig",
"video": ""
}
if media["type"] in ["video", "animated_gif"]:
variants = [i for i in media["video_info"]["variants"] if "bitrate" in i]
variants.sort(key=lambda x: x["bitrate"], reverse=True)
if variants: data["video"] = variants[0]["url"]
return data
def parse_entities(entity):
data = {
"text": "",
"indices": entity["indices"]
}
if "name" in entity: data["text"] = "@" + entity["name"]
if "text" in entity: data["text"] = "#" + entity["text"]
if "display_url" in entity: data["text"] = entity["display_url"]
return data
def parse_tweet(tweet):
data = {
"rest_id": tweet["rest_id"],
"name": tweet["core"]["user_results"]["result"]["legacy"]["name"],
"screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"],
"profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"],
"profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"],
"full_text": tweet["legacy"]["full_text"],
"created_at": tweet["legacy"]["created_at"],
"timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()),
"media": [],
"entities": [],
"quoted": {},
"retweeted": {}
}
for m in tweet["legacy"]["entities"].get("media", []):
data["media"].append(parse_media(m))
for e in ["user_mentions", "hashtags", "urls"]:
for m in tweet["legacy"]["entities"].get(e, []):
data["entities"].append(parse_entities(m))
data["entities"].sort(key=lambda x: x["indices"][0])
return data
LATEST_TWEET_ID_DICT = {}
def check_new_tweets(tweets, url):
global LATEST_TWEET_ID_DICT
if url in LATEST_TWEET_ID_DICT:
new_tweets = []
for tweet in tweets:
if tweet["rest_id"] == LATEST_TWEET_ID_DICT[url]:
LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"]
return new_tweets
new_tweets.append(tweet)
LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"]
return []
def filter_tweets(tweets, filter_list):
if "only_image" in filter_list:
tweets = [t for t in tweets if t["media"]]
if "only_origin" in filter_list:
tweets = [t for t in tweets if (not t["quoted"]) and (not t["retweeted"])]
return tweets
def check_timeline(driver, config):
data = get_timeline(driver, config["url"])
tweets = parse_timeline(data)
new_tweets = check_new_tweets(tweets, config["url"])
return filter_tweets(new_tweets, config["filter"])
def main(config):
userid = config["userid"] # screenid @后面那个)
username = config["username"] # 登录用户名或邮箱
password = config["password"] # 密码
driver = login(userid, username, password)
while 1:
json_data = {}
check_list = config.get("check_list", [])
for group_id, group_config in check_list.items():
new_tweets = check_timeline(driver, group_config)
if new_tweets:
json_data[group_id] = new_tweets
if json_data:
pprint(json_data)
try:
requests.post(config["callback_url"], json=json_data)
except Exception as e:
logger.error(str(e))
time.sleep(config.get("check_interval", 42))
if __name__ == "__main__":
with open("config.json", 'r') as f:
config = json.load(f)
main(config)
# with open("lovelive.json", 'r') as f: pprint(parse_timeline(json.load(f)))