我是Kafka的新手,并试图为它设置环境.我试图运行单个节点Kafka但我收到错误.
在mac上执行以下步骤
1. brew install zookeeper
2. brew install kafka
3. zkServer start
4. kafka-server-start.sh /usr/local/etc/kafka/server.properties
5.bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
6.bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
Run Code Online (Sandbox Code Playgroud)
但我得到了以下错误.如果我遗漏了什么,请告诉我
[2015-10-19 15:48:46,632] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
[2015-10-19 15:48:46,637] WARN Error while fetching metadata [{TopicMetadata for topic test ->
No partition metadata for topic …Run Code Online (Sandbox Code Playgroud) 我有一个对象列表List<A> list1...
A有两个成员id和name......现在我还有一个列表List<Integer> list2只包含id...我需要删除其id不存在的所有A对象.list1list2
到目前为止我尝试了什么:
void removeUnwanted(List<A> a, List<Integer> b) {
for (A object : a) {
if (!(b.contains(object.id))) {
b.remove(object)
}
}
}
Run Code Online (Sandbox Code Playgroud)
任何人都可以帮我建议最有效的方法吗?
我正在写一个火花应用程序.处理完日志后,我将输出保存为拼花格式.(使用Dataframe.saveAsParquetFile()api)但是我在保存为镶木地板时有时会收到错误.如果我重新运行保存为镶木地板的过程,则错误消失.
请让我知道这个的根本原因
java.io.IOException: Could not read footer: java.io.IOException: Could not read footer for file org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus@6233d82f
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:238)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:369)
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:154)
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:152)
at org.apache.spark.sql.parquet.ParquetRelation2.refresh(newParquet.scala:197)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:134)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:114)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:1494)
at ParquetWriter$.main(ParquetWriter.scala:182)
at ParquetWriter.main(ParquetWriter.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: …Run Code Online (Sandbox Code Playgroud) 我在Spark Streaming应用程序中广播一个值.但我不知道如何在与播放它的类不同的类中访问该变量.
我的代码如下:
object AppMain{
def main(args: Array[String]){
//...
val broadcastA = sc.broadcast(a)
//..
lines.foreachRDD(rdd => {
val obj = AppObject1
rdd.filter(p => obj.apply(p))
rdd.count
}
}
object AppObject1: Boolean{
def apply(str: String){
AnotherObject.process(str)
}
}
object AnotherObject{
// I want to use broadcast variable in this object
val B = broadcastA.Value // compilation error here
def process(): Boolean{
//need to use B inside this method
}
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,任何人都可以建议如何访问广播变量?
apache-spark ×3
java ×2
algorithm ×1
apache-kafka ×1
arraylist ×1
collections ×1
hadoop ×1
hdfs ×1
jvm ×1
parquet ×1
scala ×1