我正在运行一个Storm拓扑,它正在从具有4个节点的AWS Ubuntu Server 14.04 LTS实例上获取Kafka的推文--Nimbus,Supervisor,Kafka-Zookeeper节点,Zookeeper(用于Storm集群).我的Storm UI已启动并运行,我可以提交拓扑.我有两个经纪人,但我只使用broker.id = 0.我在主题下面有推文.我的kafka服务器运行正常.
我用这种方式创建了kafka-topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic twitter1
我很困惑的是:
SpoutConfig kafkaConfig = new SpoutConfig(kafkaHosts, topicName+"-0", "/kafka", topicName+"-0");
Run Code Online (Sandbox Code Playgroud)
我认为我的错误正在从这一点萌芽.完整的代码是:
import org.apache.storm.tuple.Fields;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import java.util.Arrays;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.kafka.StringScheme;
public class TwitterTopology{
public static void main(String[] args) {
String topicName = "twitter1";
String topologyName = args[0];
String kafkaIp = "xxx.31.xxx.207"; //hiding the IPs here. This is the IP …Run Code Online (Sandbox Code Playgroud)
基本上,我有3个arraylists:项目,频率和uniqueitems.项目包含所有元素列表,例如:面包,黄油,果酱,黄油,牛奶,果酱.Uniqueitems包含独特的东西,例如:面包,黄油,果酱,牛奶.频率包含它们在项目中出现的次数,按唯一项目的顺序排列,例如:1,2,2,1.
但是我的一些元素不会出现if or else(Jam,见附图).一些元素,尽管如果进入if,不会从我的arraylist中删除(colddrink).听起来很傻,真的.但自从昨晚以来我一直在经历这个问题.Netbeans IDE.在我的代码上也使用在线编译器.它给出了与我的IDE相同的答案.那么,我的错误到底是什么?(附加输出)
int min_sup=2;
int v;
for (i=0;i<frequency.size();i++)
{
System.out.println(uniqueitems.get(i)+ " = "+ frequency.get(i));
}
for(i=0;i<frequency.size();i++)
{
v=frequency.get(i);
if(v<min_sup)
{
uniqueitems.remove(i);
frequency.remove(i);
System.out.println(uniqueitems.get(i)+ " is inside if.");
}
else
{
System.out.println(uniqueitems.get(i)+ " is not in if.");
}
}
System.out.println("\n\n");
for(i=0;i<uniqueitems.size();i++)
{
System.out.print(uniqueitems.get(i)+" ");
}
Run Code Online (Sandbox Code Playgroud)