You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

273 lines
9.8 KiB

from collections import defaultdict
2 months ago
import json
import time
from datetime import datetime
from pprint import pprint
import requests
from loguru import logger
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
USERID, USERNAME, PASSWORD = "", "", ""
def login():
global USERID, USERNAME, PASSWORD, DRIVER
if not USERID or not USERNAME or not PASSWORD:
2 months ago
return None
try:
options = webdriver.ChromeOptions()
options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
# options.add_argument("--headless")
2 months ago
driver = webdriver.Chrome(options=options)
driver.get("https://x.com/i/flow/login")
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]')))
username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]')
username_field.send_keys(USERNAME)
2 months ago
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[2].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]')))
username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]')
username_field.send_keys(USERID)
2 months ago
buttons = driver.find_elements(By.TAG_NAME, 'button')
buttons[1].click()
WebDriverWait(driver, 10).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]')))
password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]')
password_field.send_keys(PASSWORD)
2 months ago
login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]')
login_button.click()
WebDriverWait(driver, 60).until(ec.url_contains('/home'))
2 months ago
cookies = driver.get_cookies()
cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
logger.success(f"Twitter login success for username {USERNAME}\n{cookie_string}")
DRIVER = driver
2 months ago
return driver
except Exception as e:
logger.error(f"Twitter login failed for username {USERNAME}: {e}")
2 months ago
driver.quit()
return None
ERROR_COUNT = 0
def get_timeline(url):
global ERROR_COUNT, DRIVER
2 months ago
logger.info(f"check timeline {url}")
try:
driver = DRIVER
driver.get(url)
WebDriverWait(driver, 30).until(
ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]')))
for packet in driver.get_log("performance"):
message = json.loads(packet["message"])["message"]
if (message["method"] == "Network.responseReceived" and
"ListLatestTweetsTimeline" in message["params"]["response"]["url"]):
request_id = message["params"]["requestId"]
resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
logger.info(f"checked")
ERROR_COUNT = 0
return json.loads(resp["body"])
except Exception as e:
logger.error(f"check failed: {e}")
ERROR_COUNT += 1
if ERROR_COUNT > 5:
driver.quit()
login()
return {}
2 months ago
def parse_timeline(data):
entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"]
result = []
for entry in entries:
try:
result += parse_entry(entry)
except:
logger.error(f"error when parsing entry: {entry}")
2 months ago
result.sort(key=lambda x: x["timestamp"], reverse=True)
return result
def parse_entry(entry):
result = []
entry_id = entry["entryId"]
if "list-conversation" in entry_id and not "tweet" in entry_id:
for item in entry["content"]["items"]:
data = parse_content(item["item"])
if data: result.append(data)
2 months ago
elif entry["content"]["__typename"] != 'TimelineTimelineCursor':
data = parse_content(entry["content"])
if data: result.append(data)
2 months ago
return result
def parse_content(content):
tweet = content["itemContent"]["tweet_results"]["result"]
while not "rest_id" in tweet: tweet = tweet["tweet"]
try:
data = parse_tweet(tweet)
if "quoted_status_result" in tweet:
data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"])
if "retweeted_status_result" in tweet["legacy"]:
data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"])
return data
except:
logger.error(f"error when parsing tweet: {tweet}")
return {}
2 months ago
def parse_media(media):
data = {
"url": media["media_url_https"] + "?name=orig",
"video": ""
}
if media["type"] in ["video", "animated_gif"]:
variants = [i for i in media["video_info"]["variants"] if "bitrate" in i]
variants.sort(key=lambda x: x["bitrate"], reverse=True)
if variants: data["video"] = variants[0]["url"]
return data
def parse_entities(entity):
data = {
"text": "",
"indices": entity["indices"]
}
if "name" in entity: data["text"] = "@" + entity["name"]
if "text" in entity: data["text"] = "#" + entity["text"]
if "display_url" in entity: data["text"] = entity["display_url"]
return data
def parse_card(card):
data = {}
for v in card["legacy"]["binding_values"]:
if "choice" in v["key"] or v["key"] in ["end_datetime_utc"]:
value_name = f"{v['value']['type'].lower()}_value"
data[v["key"]] = v['value'].get(value_name, "")
return data
2 months ago
def parse_tweet(tweet):
# with open("tweet.json", "w") as f: json.dump(tweet, f)
while not "rest_id" in tweet: tweet = tweet["tweet"]
2 months ago
data = {
"rest_id": tweet["rest_id"],
"name": tweet["core"]["user_results"]["result"]["legacy"]["name"],
"screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"],
"profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"],
"profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"],
"full_text": tweet["legacy"]["full_text"],
"created_at": tweet["legacy"]["created_at"],
"timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()),
"media": [],
"entities": [],
2 months ago
"quoted": {},
"retweeted": {},
"card": {}
2 months ago
}
data["profile_image"] = data["profile_image"].replace("_normal.", ".")
2 months ago
for m in tweet["legacy"]["entities"].get("media", []):
data["media"].append(parse_media(m))
for e in ["user_mentions", "hashtags", "urls"]:
for m in tweet["legacy"]["entities"].get(e, []):
data["entities"].append(parse_entities(m))
data["entities"].sort(key=lambda x: x["indices"][0])
if "card" in tweet:
data["card"] = parse_card(tweet["card"])
2 months ago
return data
LATEST_TWEET_ID_DICT = {}
def check_new_tweets(tweets, url):
global LATEST_TWEET_ID_DICT
if url in LATEST_TWEET_ID_DICT:
new_tweets = []
for tweet in tweets:
if tweet["rest_id"] == LATEST_TWEET_ID_DICT[url]:
LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"]
return new_tweets
new_tweets.append(tweet)
LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"]
return []
def filter_tweets(tweets, filter_list):
if "only_image" in filter_list:
tweets = [t for t in tweets if t["media"]]
if "only_origin" in filter_list:
tweets = [t for t in tweets if (not t["quoted"]) and (not t["retweeted"])]
return tweets
def check_timeline(config):
data = get_timeline(config["url"])
2 months ago
if data:
tweets = parse_timeline(data)
new_tweets = check_new_tweets(tweets, config["url"])
return filter_tweets(new_tweets, config["filter"])
else:
return []
2 months ago
def main(config):
global USERID, USERNAME, PASSWORD
USERID = config["userid"] # screenid @后面那个)
USERNAME = config["username"] # 登录用户名或邮箱
PASSWORD = config["password"] # 密码
login()
2 months ago
check_list = config.get("check_list", [])
check_interval = config.get("check_interval", 42)
check_interval_slow = config.get("check_interval_slow", 600)
slow_hours = config.get("slow_hours", [0, 1, 2, 3, 4, 5, 6])
last_check_time = defaultdict(lambda: 0.0)
2 months ago
while 1:
json_data = {}
for group_id, group_config in check_list.items():
group_interval = group_config.get("interval", check_interval)
if time.time() - last_check_time[group_id] > group_interval:
new_tweets = check_timeline(group_config)
if new_tweets:
json_data[group_id] = new_tweets
last_check_time[group_id] = time.time()
2 months ago
if json_data:
pprint(json_data)
try:
resp = requests.post(config["callback_url"], json=json_data)
logger.info(resp.content)
2 months ago
except Exception as e:
logger.error(str(e))
if datetime.now().hour in slow_hours:
time.sleep(check_interval_slow)
else:
time.sleep(check_interval)
2 months ago
if __name__ == "__main__":
with open("config.json", 'r') as f:
config = json.load(f)
main(config)
# with open("lovelive.json", 'r', encoding="utf8") as f: pprint(parse_timeline(json.load(f)))