import json from datetime import datetime import traceback from pprint import pformat from loguru import logger def parse_timeline(data): entries = data["data"]["list"]["tweets_timeline"]["timeline"]["instructions"][0]["entries"] result = [] for entry in entries: try: result += parse_entry(entry) except Exception as e: logger.error(f"Error when parsing entry: {e} {e.args}") logger.error(f"\n{traceback.format_exc()}") logger.error(f"\n{pformat(entry)}") result = list({ t["rest_id"]: t for t in result }.values()) result.sort(key=lambda x: x["timestamp"], reverse=True) return result def parse_entry(entry): result = [] entry_id = entry["entryId"] if "promoted" in entry_id: return [] elif ("list-conversation" in entry_id or "conversationthread" in entry_id \ and not "tweet" in entry_id): for item in entry["content"]["items"]: if "promoted" in item["entryId"]: continue data = parse_content(item["item"]) if data: result.append(data) elif "cursor" in entry_id or "bottom" in entry_id: pass elif entry["content"]["__typename"] != 'TimelineTimelineCursor': if "items" in entry["content"]: for item in entry["content"]["items"]: data = parse_content(item["item"]) if data: result.append(data) else: data = parse_content(entry["content"]) if data: result.append(data) data = parse_content(entry["content"]) if data: result.append(data) return result def parse_content(content): tweet = content["itemContent"]["tweet_results"]["result"] while not "rest_id" in tweet: tweet = tweet["tweet"] try: data = parse_tweet(tweet) if "quoted_status_result" in tweet: data["quoted"] = parse_tweet(tweet["quoted_status_result"]["result"]) if "retweeted_status_result" in tweet["legacy"]: data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) return data except Exception as e: logger.error(f"Error when parsing tweet: {e} {e.args}") logger.error(f"\n{traceback.format_exc()}") logger.error(f"\n{pformat(tweet)}") return {} def parse_media(media): data = { "url": media["media_url_https"] + "?name=orig", "video": "" } if media["type"] in ["video", "animated_gif"]: variants = [i for i in media["video_info"]["variants"] if "bitrate" in i] variants.sort(key=lambda x: x["bitrate"], reverse=True) if variants: data["video"] = variants[0]["url"] return data def parse_entities(entity): data = { "text": "", "indices": entity["indices"] } if "name" in entity: data["text"] = "@" + entity["name"] if "text" in entity: data["text"] = "#" + entity["text"] if "display_url" in entity: data["text"] = entity["display_url"] return data def parse_card(card): data = {} for v in card["legacy"]["binding_values"]: if "choice" in v["key"] or v["key"] in [ "end_datetime_utc", "unified_card", "summary_photo_image_original"]: value_name = f"{v['value']['type'].lower()}_value" data[v["key"]] = v['value'].get(value_name, "") photo = None if "unified_card" in data: card_data = json.loads(data["unified_card"]) del data["unified_card"] try: for k, v in card_data["media_entities"].items(): if "media_url_https" in v: photo = { "url": v["media_url_https"] + "?name=orig", "video": "" } break except Exception as e: logger.error(f"Error when parsing unified_card: {e} {e.args}") logger.error(f"\n{traceback.format_exc()}") logger.error(f"\n{pformat(card_data)}") if "summary_photo_image_original" in data: photo = { "url": data["summary_photo_image_original"]["url"], "video": "" } del data["summary_photo_image_original"] return data, photo def parse_user(result): user_result = result user_result.update(result.get("core", {})) user_result.update(result.get("legacy", {})) user_result.update(result.get("avatar", {})) user = { "name": user_result["name"], "screen_name": user_result["screen_name"], "profile_image": user_result.get("profile_image_url_https") or user_result.get("image_url"), "profile_image_shape": user_result.get("profile_image_shape"), } if user["profile_image"]: user["profile_image"] = user["profile_image"].replace("_normal.", ".") return user def parse_tweet(tweet): # with open("tweet.json", "w") as f: json.dump(tweet, f) while not "rest_id" in tweet: tweet = tweet["tweet"] data = { "rest_id": tweet["rest_id"], "name": "", "screen_name": "", "profile_image": "", "profile_image_shape": "", "full_text": tweet["legacy"]["full_text"], "created_at": tweet["legacy"]["created_at"], "timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()), "reply_to": "", "media": [], "entities": [], "quoted": {}, "retweeted": {}, "card": {} } user = parse_user(tweet["core"]["user_results"]["result"]) data.update(user) if "in_reply_to_status_id_str" in tweet["legacy"]: data["reply_to"] = tweet["legacy"]["in_reply_to_status_id_str"] for m in tweet["legacy"]["entities"].get("media", []): data["media"].append(parse_media(m)) for e in ["user_mentions", "hashtags", "urls"]: for m in tweet["legacy"]["entities"].get(e, []): data["entities"].append(parse_entities(m)) data["entities"].sort(key=lambda x: x["indices"][0]) if "card" in tweet: data["card"], _photo = parse_card(tweet["card"]) if _photo: data["media"].append(_photo) return data def parse_detail(data): # with open("detail.json", "w", encoding="utf-8") as f: # json.dump(data, f, ensure_ascii=False, indent=4) entries = [] for i in data["data"]["threaded_conversation_with_injections_v2"]["instructions"]: if i["type"] == "TimelineAddEntries": entries += i["entries"] # entries = data["data"]["threaded_conversation_with_injections_v2"]["instructions"][0]["entries"] result = [] for entry in entries: try: result += parse_entry(entry) except Exception as e: logger.error(f"Error when parsing entry: {e} {e.args}") logger.error(f"\n{traceback.format_exc()}") logger.error(f"\n{pformat(entry)}") result = [r for r in result if not r["screen_name"] in ["premium"]] result = list({ t["rest_id"]: t for t in result }.values()) result.sort(key=lambda x: x["timestamp"]) return result