|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from pprint import pprint
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
from loguru import logger
|
|
|
|
|
from selenium import webdriver
|
|
|
|
|
from selenium.webdriver.common.by import By
|
|
|
|
|
from selenium.webdriver.support import expected_conditions as ec
|
|
|
|
|
from selenium.webdriver.support.wait import WebDriverWait
|
|
|
|
|
|
|
|
|
|
USERID, USERNAME, PASSWORD = "", "", ""
|
|
|
|
|
def login():
|
|
|
|
|
global USERID, USERNAME, PASSWORD, DRIVER
|
|
|
|
|
if not USERID or not USERNAME or not PASSWORD:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
options = webdriver.ChromeOptions()
|
|
|
|
|
options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
|
|
|
|
|
options.add_argument("--headless")
|
|
|
|
|
driver = webdriver.Chrome(options=options)
|
|
|
|
|
driver.set_page_load_timeout(30)
|
|
|
|
|
driver.get("https://x.com/i/flow/login")
|
|
|
|
|
|
|
|
|
|
WebDriverWait(driver, 10).until(
|
|
|
|
|
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]')))
|
|
|
|
|
username_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="username"]')
|
|
|
|
|
username_field.send_keys(USERNAME)
|
|
|
|
|
buttons = driver.find_elements(By.TAG_NAME, 'button')
|
|
|
|
|
buttons[2].click()
|
|
|
|
|
|
|
|
|
|
WebDriverWait(driver, 10).until(
|
|
|
|
|
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="on"]')))
|
|
|
|
|
userid_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="on"]')
|
|
|
|
|
if not userid_field.get_attribute("value"):
|
|
|
|
|
userid_field.send_keys(USERID)
|
|
|
|
|
buttons = driver.find_elements(By.TAG_NAME, 'button')
|
|
|
|
|
buttons[1].click()
|
|
|
|
|
|
|
|
|
|
WebDriverWait(driver, 10).until(
|
|
|
|
|
ec.presence_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="current-password"]')))
|
|
|
|
|
password_field = driver.find_element(By.CSS_SELECTOR, 'input[autocomplete="current-password"]')
|
|
|
|
|
password_field.send_keys(PASSWORD)
|
|
|
|
|
login_button = driver.find_element(By.CSS_SELECTOR, 'button[data-testid="LoginForm_Login_Button"]')
|
|
|
|
|
login_button.click()
|
|
|
|
|
|
|
|
|
|
WebDriverWait(driver, 60).until(ec.url_contains('/home'))
|
|
|
|
|
cookies = driver.get_cookies()
|
|
|
|
|
cookie_string = "; ".join([f"{cookie['name']}={cookie['value']}" for cookie in cookies])
|
|
|
|
|
logger.success(f"Twitter login success for username {USERNAME}\n{cookie_string}")
|
|
|
|
|
DRIVER = driver
|
|
|
|
|
return driver
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Twitter login failed for username {USERNAME}: {e}")
|
|
|
|
|
driver.quit()
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ERROR_COUNT = 0
|
|
|
|
|
def get_timeline(url):
|
|
|
|
|
global ERROR_COUNT, DRIVER
|
|
|
|
|
logger.info(f"check timeline {url}")
|
|
|
|
|
try:
|
|
|
|
|
driver = DRIVER
|
|
|
|
|
driver.get(url)
|
|
|
|
|
WebDriverWait(driver, 30).until(
|
|
|
|
|
ec.presence_of_element_located((By.CSS_SELECTOR, 'div[aria-label="Timeline: List"]')))
|
|
|
|
|
for packet in driver.get_log("performance"):
|
|
|
|
|
message = json.loads(packet["message"])["message"]
|
|
|
|
|
if (message["method"] == "Network.responseReceived" and
|
|
|
|
|
"ListLatestTweetsTimeline" in message["params"]["response"]["url"]):
|
|
|
|
|
request_id = message["params"]["requestId"]
|
|
|
|
|
resp = driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': request_id})
|
|
|
|
|
logger.info(f"checked")
|
|
|
|
|
ERROR_COUNT = 0
|
|
|
|
|
return json.loads(resp["body"])
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"check failed: {e}")
|
|
|
|
|
ERROR_COUNT += 1
|
|
|
|
|
if ERROR_COUNT > 5:
|
|
|
|
|
driver.quit()
|
|
|
|
|
login()
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_timeline(data):
|
|
|
|
|
entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"]
|
|
|
|
|
result = []
|
|
|
|
|
for entry in entries:
|
|
|
|
|
try:
|
|
|
|
|
result += parse_entry(entry)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"error when parsing entry: {e} {e.args}\n{entry}")
|
|
|
|
|
result.sort(key=lambda x: x["timestamp"], reverse=True)
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def parse_entry(entry):
|
|
|
|
|
result = []
|
|
|
|
|
entry_id = entry["entryId"]
|
|
|
|
|
if "list-conversation" in entry_id and not "tweet" in entry_id:
|
|
|
|
|
for item in entry["content"]["items"]:
|
|
|
|
|
data = parse_content(item["item"])
|
|
|
|
|
if data: result.append(data)
|
|
|
|
|
elif entry["content"]["__typename"] != 'TimelineTimelineCursor':
|
|
|
|
|
data = parse_content(entry["content"])
|
|
|
|
|
if data: result.append(data)
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def parse_content(content):
|
|
|
|
|
tweet = content["itemContent"]["tweet_results"]["result"]
|
|
|
|
|
while not "rest_id" in tweet: tweet = tweet["tweet"]
|
|
|
|
|
try:
|
|
|
|
|
data = parse_tweet(tweet)
|
|
|
|
|
if "quoted_status_result" in tweet:
|
|
|
|
|
data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"])
|
|
|
|
|
if "retweeted_status_result" in tweet["legacy"]:
|
|
|
|
|
data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"])
|
|
|
|
|
return data
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"error when parsing tweet: {e} {e.args}\n{tweet}")
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
def parse_media(media):
|
|
|
|
|
data = {
|
|
|
|
|
"url": media["media_url_https"] + "?name=orig",
|
|
|
|
|
"video": ""
|
|
|
|
|
}
|
|
|
|
|
if media["type"] in ["video", "animated_gif"]:
|
|
|
|
|
variants = [i for i in media["video_info"]["variants"] if "bitrate" in i]
|
|
|
|
|
variants.sort(key=lambda x: x["bitrate"], reverse=True)
|
|
|
|
|
if variants: data["video"] = variants[0]["url"]
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
def parse_entities(entity):
|
|
|
|
|
data = {
|
|
|
|
|
"text": "",
|
|
|
|
|
"indices": entity["indices"]
|
|
|
|
|
}
|
|
|
|
|
if "name" in entity: data["text"] = "@" + entity["name"]
|
|
|
|
|
if "text" in entity: data["text"] = "#" + entity["text"]
|
|
|
|
|
if "display_url" in entity: data["text"] = entity["display_url"]
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
def parse_card(card):
|
|
|
|
|
data = {}
|
|
|
|
|
for v in card["legacy"]["binding_values"]:
|
|
|
|
|
if "choice" in v["key"] or v["key"] in [
|
|
|
|
|
"end_datetime_utc",
|
|
|
|
|
"unified_card",
|
|
|
|
|
"summary_photo_image_original"]:
|
|
|
|
|
value_name = f"{v['value']['type'].lower()}_value"
|
|
|
|
|
data[v["key"]] = v['value'].get(value_name, "")
|
|
|
|
|
|
|
|
|
|
photo = None
|
|
|
|
|
if "unified_card" in data:
|
|
|
|
|
card_data = json.loads(data["unified_card"])
|
|
|
|
|
del data["unified_card"]
|
|
|
|
|
try:
|
|
|
|
|
for k, v in card_data["media_entities"].items():
|
|
|
|
|
if "media_url_https" in v:
|
|
|
|
|
photo = {
|
|
|
|
|
"url": v["media_url_https"] + "?name=orig",
|
|
|
|
|
"video": ""
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
except:
|
|
|
|
|
logger.error(f"error parsing unified_card {card_data}")
|
|
|
|
|
|
|
|
|
|
if "summary_photo_image_original" in data:
|
|
|
|
|
photo = {
|
|
|
|
|
"url": data["summary_photo_image_original"]["url"],
|
|
|
|
|
"video": ""
|
|
|
|
|
}
|
|
|
|
|
del data["summary_photo_image_original"]
|
|
|
|
|
|
|
|
|
|
return data, photo
|
|
|
|
|
|
|
|
|
|
def parse_tweet(tweet):
|
|
|
|
|
# with open("tweet.json", "w") as f: json.dump(tweet, f)
|
|
|
|
|
while not "rest_id" in tweet: tweet = tweet["tweet"]
|
|
|
|
|
data = {
|
|
|
|
|
"rest_id": tweet["rest_id"],
|
|
|
|
|
"name": tweet["core"]["user_results"]["result"]["legacy"]["name"],
|
|
|
|
|
"screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"],
|
|
|
|
|
"profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"],
|
|
|
|
|
"profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"],
|
|
|
|
|
"full_text": tweet["legacy"]["full_text"],
|
|
|
|
|
"created_at": tweet["legacy"]["created_at"],
|
|
|
|
|
"timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()),
|
|
|
|
|
"reply_to": "",
|
|
|
|
|
"media": [],
|
|
|
|
|
"entities": [],
|
|
|
|
|
"quoted": {},
|
|
|
|
|
"retweeted": {},
|
|
|
|
|
"card": {}
|
|
|
|
|
}
|
|
|
|
|
data["profile_image"] = data["profile_image"].replace("_normal.", ".")
|
|
|
|
|
|
|
|
|
|
if "in_reply_to_status_id_str" in tweet["legacy"]:
|
|
|
|
|
data["reply_to"] = tweet["legacy"]["in_reply_to_status_id_str"]
|
|
|
|
|
|
|
|
|
|
for m in tweet["legacy"]["entities"].get("media", []):
|
|
|
|
|
data["media"].append(parse_media(m))
|
|
|
|
|
|
|
|
|
|
for e in ["user_mentions", "hashtags", "urls"]:
|
|
|
|
|
for m in tweet["legacy"]["entities"].get(e, []):
|
|
|
|
|
data["entities"].append(parse_entities(m))
|
|
|
|
|
data["entities"].sort(key=lambda x: x["indices"][0])
|
|
|
|
|
|
|
|
|
|
if "card" in tweet:
|
|
|
|
|
data["card"], _photo = parse_card(tweet["card"])
|
|
|
|
|
if _photo: data["media"].append(_photo)
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LATEST_TWEET_ID_DICT = {}
|
|
|
|
|
LATEST_TWEET_TS_DICT = {}
|
|
|
|
|
def check_new_tweets(tweets, url):
|
|
|
|
|
global LATEST_TWEET_ID_DICT
|
|
|
|
|
|
|
|
|
|
new_tweets = []
|
|
|
|
|
if url in LATEST_TWEET_ID_DICT:
|
|
|
|
|
for tweet in tweets:
|
|
|
|
|
if tweet["rest_id"] == LATEST_TWEET_ID_DICT[url]:
|
|
|
|
|
break
|
|
|
|
|
if tweet["timestamp"] < LATEST_TWEET_TS_DICT[url]:
|
|
|
|
|
break
|
|
|
|
|
if time.time() - tweet["timestamp"] > 1200:
|
|
|
|
|
break
|
|
|
|
|
new_tweets.append(tweet)
|
|
|
|
|
|
|
|
|
|
LATEST_TWEET_ID_DICT[url] = tweets[0]["rest_id"]
|
|
|
|
|
LATEST_TWEET_TS_DICT[url] = tweets[0]["timestamp"]
|
|
|
|
|
return new_tweets
|
|
|
|
|
|
|
|
|
|
def filter_tweets(tweets, filter_list):
|
|
|
|
|
|
|
|
|
|
if "only_image" in filter_list:
|
|
|
|
|
tweets = [t for t in tweets if t["media"]]
|
|
|
|
|
|
|
|
|
|
if "only_origin" in filter_list:
|
|
|
|
|
tweets = [t for t in tweets if (not t["quoted"]) and (not t["retweeted"])]
|
|
|
|
|
|
|
|
|
|
return tweets
|
|
|
|
|
|
|
|
|
|
def check_timeline(config):
|
|
|
|
|
data = get_timeline(config["url"])
|
|
|
|
|
if data:
|
|
|
|
|
tweets = parse_timeline(data)
|
|
|
|
|
new_tweets = check_new_tweets(tweets, config["url"])
|
|
|
|
|
return filter_tweets(new_tweets, config["filter"])
|
|
|
|
|
else:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(config):
|
|
|
|
|
global USERID, USERNAME, PASSWORD
|
|
|
|
|
USERID = config["userid"] # screenid (@后面那个)
|
|
|
|
|
USERNAME = config["username"] # 登录用户名或邮箱
|
|
|
|
|
PASSWORD = config["password"] # 密码
|
|
|
|
|
login()
|
|
|
|
|
|
|
|
|
|
check_list = config.get("check_list", [])
|
|
|
|
|
check_interval = config.get("check_interval", 42)
|
|
|
|
|
check_interval_slow = config.get("check_interval_slow", 600)
|
|
|
|
|
slow_hours = config.get("slow_hours", [0, 1, 2, 3, 4, 5, 6])
|
|
|
|
|
last_check_time = defaultdict(float)
|
|
|
|
|
|
|
|
|
|
while 1:
|
|
|
|
|
json_data = {}
|
|
|
|
|
for group_id, group_config in check_list.items():
|
|
|
|
|
group_interval = group_config.get("interval", check_interval)
|
|
|
|
|
|
|
|
|
|
if time.time() - last_check_time[group_id] > group_interval:
|
|
|
|
|
new_tweets = check_timeline(group_config)
|
|
|
|
|
if new_tweets:
|
|
|
|
|
json_data[group_id] = new_tweets
|
|
|
|
|
last_check_time[group_id] = time.time()
|
|
|
|
|
|
|
|
|
|
if json_data:
|
|
|
|
|
pprint(json_data)
|
|
|
|
|
try:
|
|
|
|
|
resp = requests.post(config["callback_url"],
|
|
|
|
|
json=json_data, timeout=10)
|
|
|
|
|
logger.info(resp.content)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(str(e))
|
|
|
|
|
|
|
|
|
|
if datetime.now().hour in slow_hours:
|
|
|
|
|
time.sleep(check_interval_slow)
|
|
|
|
|
else:
|
|
|
|
|
time.sleep(check_interval)
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
with open("config.json", 'r') as f:
|
|
|
|
|
config = json.load(f)
|
|
|
|
|
main(config)
|
|
|
|
|
|
|
|
|
|
# with open("lovelive.json", 'r', encoding="utf8") as f: pprint(parse_timeline(json.load(f)))
|