import tweepy
import tkinter as tk
from tkinter import scrolledtext
# 替换成您的API密钥和访问令牌
consumer_key = 'YOUR_CONSUMER_KEY'
consumer_secret = 'YOUR_CONSUMER_SECRET'
access_token = 'YOUR_ACCESS_TOKEN'
access_token_secret = 'YOUR_ACCESS_TOKEN_SECRET'
# 定义一个继承自tweepy.StreamListener的自定义监听器类
class MyStreamListener(tweepy.StreamListener):
def __init__(self, output_text):
super(MyStreamListener, self).__init__()
self.output_text = output_text
def on_status(self, status):
tweet_id = status.id_str # 推文ID
tweet_text = status.full_text # 推文文本
tweet_created_at = status.created_at # 推文发布时间
tweet_info = f"Tweet ID: {tweet_id}\nTweet Text: {tweet_text}\nTweet Created At: {tweet_created_at}\n--------------\n"
# 在文本框中显示推文信息
self.output_text.insert(tk.END, tweet_info)
self.output_text.see(tk.END)
# 认证
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# 创建Stream对象,并指定监听器
my_stream_listener = MyStreamListener
my_stream = tweepy.Stream(auth=auth, listener=my_stream_listener)
# 创建Tkinter窗口
root = tk.Tk()
root.title("实时推特爬虫")
root.geometry("600x400")
# 创建一个滚动文本框
output_text = scrolledtext.ScrolledText(root, wrap=tk.WORD)
output_text.pack(expand=True, fill=tk.BOTH)
# 启动监听,并过滤特定账号的推文
username = 'TARGET_USERNAME'
my_stream.filter(track=[username], is_async=True)
# 启动Tkinter事件循环
root.mainloop()