|  |  |  | @ -1,5 +1,7 @@ | 
			
		
	
		
			
				
					|  |  |  |  | import json | 
			
		
	
		
			
				
					|  |  |  |  | from datetime import datetime | 
			
		
	
		
			
				
					|  |  |  |  | import traceback | 
			
		
	
		
			
				
					|  |  |  |  | from pprint import pformat | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | from loguru import logger | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | @ -11,7 +13,9 @@ def parse_timeline(data): | 
			
		
	
		
			
				
					|  |  |  |  |         try: | 
			
		
	
		
			
				
					|  |  |  |  |             result += parse_entry(entry) | 
			
		
	
		
			
				
					|  |  |  |  |         except Exception as e: | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"error when parsing entry: {e} {e.args}\n{entry}") | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"error when parsing entry: {e} {e.args}") | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"\n{traceback.format_exc()}") | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"\n{pformat(entry)}") | 
			
		
	
		
			
				
					|  |  |  |  |     result.sort(key=lambda x: x["timestamp"], reverse=True) | 
			
		
	
		
			
				
					|  |  |  |  |     return result | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | @ -20,10 +24,13 @@ def parse_entry(entry): | 
			
		
	
		
			
				
					|  |  |  |  |     entry_id = entry["entryId"] | 
			
		
	
		
			
				
					|  |  |  |  |     if "promoted" in entry_id: | 
			
		
	
		
			
				
					|  |  |  |  |         return [] | 
			
		
	
		
			
				
					|  |  |  |  |     elif "list-conversation" in entry_id and not "tweet" in entry_id: | 
			
		
	
		
			
				
					|  |  |  |  |     elif ("list-conversation" in entry_id or "conversationthread" in entry_id \ | 
			
		
	
		
			
				
					|  |  |  |  |         and not "tweet" in entry_id): | 
			
		
	
		
			
				
					|  |  |  |  |         for item in entry["content"]["items"]: | 
			
		
	
		
			
				
					|  |  |  |  |             data = parse_content(item["item"]) | 
			
		
	
		
			
				
					|  |  |  |  |             if data: result.append(data) | 
			
		
	
		
			
				
					|  |  |  |  |     elif "cursor" in entry_id or "bottom" in entry_id: | 
			
		
	
		
			
				
					|  |  |  |  |         pass | 
			
		
	
		
			
				
					|  |  |  |  |     elif entry["content"]["__typename"] != 'TimelineTimelineCursor': | 
			
		
	
		
			
				
					|  |  |  |  |         data = parse_content(entry["content"]) | 
			
		
	
		
			
				
					|  |  |  |  |         if data: result.append(data) | 
			
		
	
	
		
			
				
					|  |  |  | @ -40,7 +47,9 @@ def parse_content(content): | 
			
		
	
		
			
				
					|  |  |  |  |             data["retweeted"] = parse_tweet(tweet["legacy"]["retweeted_status_result"]["result"]) | 
			
		
	
		
			
				
					|  |  |  |  |         return data | 
			
		
	
		
			
				
					|  |  |  |  |     except Exception as e: | 
			
		
	
		
			
				
					|  |  |  |  |         logger.error(f"error when parsing tweet: {e} {e.args}\n{tweet}") | 
			
		
	
		
			
				
					|  |  |  |  |         logger.error(f"error when parsing tweet: {e} {e.args}") | 
			
		
	
		
			
				
					|  |  |  |  |         logger.error(f"\n{traceback.format_exc()}") | 
			
		
	
		
			
				
					|  |  |  |  |         logger.error(f"\n{pformat(tweet)}") | 
			
		
	
		
			
				
					|  |  |  |  |         return {} | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def parse_media(media): | 
			
		
	
	
		
			
				
					|  |  |  | @ -86,8 +95,10 @@ def parse_card(card): | 
			
		
	
		
			
				
					|  |  |  |  |                         "video": "" | 
			
		
	
		
			
				
					|  |  |  |  |                     } | 
			
		
	
		
			
				
					|  |  |  |  |                 break | 
			
		
	
		
			
				
					|  |  |  |  |         except: | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"error parsing unified_card {card_data}") | 
			
		
	
		
			
				
					|  |  |  |  |         except Exception as e: | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"error when parsing unified_card: {e} {e.args}") | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"\n{traceback.format_exc()}") | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"\n{pformat(card_data)}") | 
			
		
	
		
			
				
					|  |  |  |  |      | 
			
		
	
		
			
				
					|  |  |  |  |     if "summary_photo_image_original" in data: | 
			
		
	
		
			
				
					|  |  |  |  |         photo = { | 
			
		
	
	
		
			
				
					|  |  |  | @ -98,15 +109,32 @@ def parse_card(card): | 
			
		
	
		
			
				
					|  |  |  |  |      | 
			
		
	
		
			
				
					|  |  |  |  |     return data, photo | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def parse_user(result): | 
			
		
	
		
			
				
					|  |  |  |  |     user_result = result | 
			
		
	
		
			
				
					|  |  |  |  |     user_result.update(result.get("core", {})) | 
			
		
	
		
			
				
					|  |  |  |  |     user_result.update(result.get("legacy", {})) | 
			
		
	
		
			
				
					|  |  |  |  |     user_result.update(result.get("avatar", {})) | 
			
		
	
		
			
				
					|  |  |  |  |     user = { | 
			
		
	
		
			
				
					|  |  |  |  |         "name": user_result["name"], | 
			
		
	
		
			
				
					|  |  |  |  |         "screen_name": user_result["screen_name"], | 
			
		
	
		
			
				
					|  |  |  |  |         "profile_image": user_result.get("profile_image_url_https") or user_result.get("image_url"), | 
			
		
	
		
			
				
					|  |  |  |  |         "profile_image_shape": user_result.get("profile_image_shape"), | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  |     if user["profile_image"]: | 
			
		
	
		
			
				
					|  |  |  |  |         user["profile_image"] = user["profile_image"].replace("_normal.", ".") | 
			
		
	
		
			
				
					|  |  |  |  |     return user | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def parse_tweet(tweet): | 
			
		
	
		
			
				
					|  |  |  |  |     # with open("tweet.json", "w") as f: json.dump(tweet, f) | 
			
		
	
		
			
				
					|  |  |  |  |     while not "rest_id" in tweet: tweet = tweet["tweet"] | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     data = { | 
			
		
	
		
			
				
					|  |  |  |  |         "rest_id": tweet["rest_id"], | 
			
		
	
		
			
				
					|  |  |  |  |         "name": tweet["core"]["user_results"]["result"]["legacy"]["name"], | 
			
		
	
		
			
				
					|  |  |  |  |         "screen_name": tweet["core"]["user_results"]["result"]["legacy"]["screen_name"], | 
			
		
	
		
			
				
					|  |  |  |  |         "profile_image": tweet["core"]["user_results"]["result"]["legacy"]["profile_image_url_https"], | 
			
		
	
		
			
				
					|  |  |  |  |         "profile_image_shape": tweet["core"]["user_results"]["result"]["profile_image_shape"], | 
			
		
	
		
			
				
					|  |  |  |  |         "name": "", | 
			
		
	
		
			
				
					|  |  |  |  |         "screen_name": "", | 
			
		
	
		
			
				
					|  |  |  |  |         "profile_image": "", | 
			
		
	
		
			
				
					|  |  |  |  |         "profile_image_shape": "", | 
			
		
	
		
			
				
					|  |  |  |  |         "full_text": tweet["legacy"]["full_text"], | 
			
		
	
		
			
				
					|  |  |  |  |         "created_at": tweet["legacy"]["created_at"], | 
			
		
	
		
			
				
					|  |  |  |  |         "timestamp": int(datetime.strptime(tweet["legacy"]["created_at"], '%a %b %d %H:%M:%S %z %Y').timestamp()), | 
			
		
	
	
		
			
				
					|  |  |  | @ -117,7 +145,8 @@ def parse_tweet(tweet): | 
			
		
	
		
			
				
					|  |  |  |  |         "retweeted": {}, | 
			
		
	
		
			
				
					|  |  |  |  |         "card": {} | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  |     data["profile_image"] = data["profile_image"].replace("_normal.", ".") | 
			
		
	
		
			
				
					|  |  |  |  |     user = parse_user(tweet["core"]["user_results"]["result"]) | 
			
		
	
		
			
				
					|  |  |  |  |     data.update(user) | 
			
		
	
		
			
				
					|  |  |  |  |      | 
			
		
	
		
			
				
					|  |  |  |  |     if "in_reply_to_status_id_str" in tweet["legacy"]: | 
			
		
	
		
			
				
					|  |  |  |  |         data["reply_to"] = tweet["legacy"]["in_reply_to_status_id_str"] | 
			
		
	
	
		
			
				
					|  |  |  | @ -134,4 +163,19 @@ def parse_tweet(tweet): | 
			
		
	
		
			
				
					|  |  |  |  |         data["card"], _photo = parse_card(tweet["card"]) | 
			
		
	
		
			
				
					|  |  |  |  |         if _photo: data["media"].append(_photo) | 
			
		
	
		
			
				
					|  |  |  |  |      | 
			
		
	
		
			
				
					|  |  |  |  |     return data | 
			
		
	
		
			
				
					|  |  |  |  |     return data | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def parse_detail(data): | 
			
		
	
		
			
				
					|  |  |  |  |     entries = data["data"]["threaded_conversation_with_injections_v2"]["instructions"][0]["entries"] | 
			
		
	
		
			
				
					|  |  |  |  |     result = [] | 
			
		
	
		
			
				
					|  |  |  |  |     for entry in entries: | 
			
		
	
		
			
				
					|  |  |  |  |         try: | 
			
		
	
		
			
				
					|  |  |  |  |             result += parse_entry(entry) | 
			
		
	
		
			
				
					|  |  |  |  |         except Exception as e: | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"error when parsing entry: {e} {e.args}") | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"\n{traceback.format_exc()}") | 
			
		
	
		
			
				
					|  |  |  |  |             logger.error(f"\n{pformat(entry)}") | 
			
		
	
		
			
				
					|  |  |  |  |     result.sort(key=lambda x: x["timestamp"]) | 
			
		
	
		
			
				
					|  |  |  |  |     return result |