Ton*_*ven 2 java twitter multithreading twitter4j
我有一组关键字(超过600),我想使用流式api来跟踪它们的推文.Twitter api将允许跟踪的关键字数量限制为200.因此,我决定使用多个OAuth令牌来完成此操作.我是这样做的:
String[] dbKeywords = KeywordImpl.listKeywords();
List<String[]> keywords = ditributeKeywords(dbKeywords);
for (String[] subList : keywords) {
StreamCrawler streamCrawler = new StreamCrawler();
streamCrawler.setKeywords(subList);
Thread crawlerThread = new Thread(streamCrawler);
crawlerThread.start();
}
Run Code Online (Sandbox Code Playgroud)
这是在线程之间分配单词的方式.每个线程接收不超过200个单词.这是StreamCrawler的实现:
public class StreamCrawler extends Crawler implements Runnable {
...
private String[] keywords;
public void setKeywords(String[] keywords) {
this.keywords = keywords;
}
@Override
public void run() {
TwitterStream twitterStream = getTwitterInstance();
StatusListener listener = new StatusListener() {
ArrayDeque<Tweet> tweetbuffer = new ArrayDeque<Tweet>();
ArrayDeque<TwitterUser> userbuffer = new ArrayDeque<TwitterUser>();
@Override
public void onException(Exception arg0) {
System.out.println(arg0);
}
@Override
public void onDeletionNotice(StatusDeletionNotice arg0) {
System.out.println(arg0);
}
@Override
public void onScrubGeo(long arg0, long arg1) {
System.out.println(arg1);
}
@Override
public void onStatus(Status status) {
...Doing something with message
}
@Override
public void onTrackLimitationNotice(int arg0) {
System.out.println(arg0);
try {
Thread.sleep(5 * 60 * 1000);
System.out.println("Will sleep for 5 minutes!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onStallWarning(StallWarning arg0) {
System.out.println(arg0);
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = getKeywords();
System.out.println(keywords.length);
System.out.println("Listening for " + Arrays.toString(keywords));
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
}
private long getCurrentThreadId() {
return Thread.currentThread().getId();
}
private TwitterStream getTwitterInstance() {
TwitterConfiguration configuration = null;
TwitterStream twitterStream = null;
while (configuration == null) {
configuration = TokenFactory.getAvailableToken();
if (configuration != null) {
System.out
.println("Token was obtained " + getCurrentThreadId());
System.out.println(configuration.getTwitterAccount());
setToken(configuration);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey(configuration.getConsumerKey());
cb.setOAuthConsumerSecret(configuration.getConsumerSecret());
cb.setOAuthAccessToken(configuration.getAccessToken());
cb.setOAuthAccessTokenSecret(configuration.getAccessSecret());
twitterStream = new TwitterStreamFactory(cb.build())
.getInstance();
} else {
// If there is no available configuration, wait for 2 minutes
// and try again
try {
System.out
.println("There were no available tokens, sleeping for 2 minutes.");
Thread.sleep(2 * 60 * 1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
return twitterStream;
}
}
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,当我开始例如2个线程时,我得到通知,它们都是打开流并获取它.但实际上只有第一个真正得到流并分别调用OnStatus方法.在第二个线程中使用的数组不为空; Twitter配置也是有效且独特的.所以我不明白这种行为可能是什么原因.为什么唯一的第一个线程返回推文?
小智 10
据我所知,你试图从同一个IP同时连接到公共流端点(也就是通用流或stream.twitter.com).
更具体地说,我认为您需要两个活动连接来自同一IP的stream.twitter.com/1.1/statuses/filter.json.
虽然Twitter流媒体apis文档没有明确说明只有一个与公共端点的连接,但Twitter员工在开发网站上澄清了这一点https://dev.twitter.com/discussions/7542
对于常规流,您应该只从同一个IP建立一个连接.
这意味着您使用两个不同的Twitter应用程序/帐户连接到公共流并不重要; 只要你从同一个IP地址连接,你就只能有一个站点连接到公共流.你说你有两个流连接,这个行为的答案是由Twitter员工给出的:https://dev.twitter.com/discussions/14935
您可能会发现有时stream.twitter.com可以让您在这里或那里获得更多开放的连接,但不应该依赖这种行为.
例如,如果您尝试在第二个线程中连接到用户流(twitter4j TwitterStream user()方法),那么您将真正开始获取过滤器和用户流.
关于200轨道关键字限制,twitter4j.org javadoc可能有点过时了.以下是twitter api文档的内容
默认访问级别最多允许400个跟踪关键字,5,000个跟随用户ID和25个0.1-360度位置框.如果您需要提升对Streaming API的访问权限,您应该探索我们的Twitter数据合作伙伴提供商......
因此,如果您需要超越400,您可能想要求Twitter提高您的Twitter帐户应用程序的跟踪访问级别,或者与Twitter数据的认证合作伙伴提供商合作.
你不一定需要的另一件事是启动获取流的新线程,因为twitter4j过滤器(或用户)"方法在内部创建一个操作TwitterStream并连续调用足够的侦听器方法的线程"(引自Yusuke的示例代码)山本).
我希望这有帮助.(我无法发布更多链接,因为我得到这个"你需要至少10个声望才能发布2个以上的链接")
| 归档时间: |
|
| 查看次数: |
2166 次 |
| 最近记录: |