小编mar*_*ios的帖子

为什么kafka生成器在初始化时采用代理端点而不是zk

如果我有多个经纪人,我的生产者应该使用哪个经纪人?我是否需要手动切换代理以平衡负载?另外,为什么消费者只需要一个zookeeper端点而不是代理端点?

教程中的快速示例:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Run Code Online (Sandbox Code Playgroud)

apache-kafka

36
推荐指数
2
解决办法
2万
查看次数

Spark 2.0缺少火花暗示

使用Spark 2.0,我发现有可能将行的数据帧转换为案例类的数据框.当我尝试这样做的时候,我打招呼说要导入spark.implicits._.我遇到的问题是Intellij没有认识到这是一个有效的导入语句,我想知道是否已经移动并且消息没有更新,或者我的构建设置中没有正确的包,这里是我的build.sbt

libraryDependencies ++= Seq(
  "org.mongodb.spark" % "mongo-spark-connector_2.11" % "2.0.0-rc0",
  "org.apache.spark" % "spark-core_2.11" % "2.0.0",
  "org.apache.spark" % "spark-sql_2.11" % "2.0.0"
)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

29
推荐指数
2
解决办法
2万
查看次数

Spark:测试RDD是否为空的有效方法

isEmptyRDD上没有方法,那么如果RDD为空,最有效的测试方法是什么?

scala apache-spark rdd

26
推荐指数
1
解决办法
2万
查看次数

SBT测试错误:java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream

获得以下异常,当我尝试使用scalatest在SBT窗口上执行我的火花流代码的单元测试时.

sbt testOnly <<ClassName>>

*
*
*
*
*
*

2018-06-18 02:39:00 ERROR执行程序:91 - 阶段3.0(TID 11)中的任务1.0中的异常java.lang.NoSuchMethodError:net.jpountz.lz4.LZ4BlockInputStream.(Ljava/io/InputStream; Z)V位于org.apache.spark.serializer.SerializerManager.wrapStream的org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)at org.apache.spark.serializer.SerializerManager.wrapStream (SerializerManager.scala:124)在org.apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 2.适用(BlockStoreShuffleReader.scala:50)在org.apache.spark.shuffle.BlockStoreShuffleReader $$ anonfun $ 2.适用(BlockStoreShuffleReader.scala :50)在org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:417)在org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)在scala.collection.Iterator $$匿名在scala上的scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:441)$ 12.nextCur(Iterator.scala:435).collection.Iterator $$匿名$ 11.hasNext(Iterator.scala:409)在org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala: 37)在scala.collection.Iterator $$不久$ 11.hasNext(Iterator.scala:409)在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.sort_addToSorter $(来源不明)在org.apache.spark .sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.processNext(来源不明)在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)在org.apache.spark.sql.execution.WholeStageCodegenExec $ $ anonfun 10 $ $$匿名$ 1.hasNext(WholeStageCodegenExec.scala:614):在org.apache.spark.sql.execution.streaming在org.apache.spark.sql.execution.GroupedIterator $.适用(29 GroupedIterator.scala) .FlatMapGroupsWithStateExec $ StateStoreUpdater.updateStateForKeysWithData(FlatMapGroupsWithStateExec.scala:176)**

尝试了几件事来排除net.jpountz.lz4 jar(来自其他帖子的建议)但输出中的同样错误.

目前使用spark 2.3,scalatest 3.0.5,Scala 2.11版本.我在升级到spark 2.3和scalatest 3.0.5后才看到这个问题

有什么建议 ?

scala sbt scalatest apache-spark spark-streaming

13
推荐指数
1
解决办法
6106
查看次数

如何为'sbt console'设置彩色REPL?

从Scala 2.11.4开始,您可以通过调用获得彩色REPLscala -Dscala.color.我的问题是,当我sbt console在SBT项目中调用时是否可以获得相同颜色的REPL ?

scala sbt

12
推荐指数
2
解决办法
2004
查看次数

Spark Kryo:注册自定义序列化程序

我有一个类通过实现read()write()方法实现自定义Kryo序列化器com.esotericsoftware.kryo.Serializer(参见下面的示例).如何在Spark中注册此自定义序列化程序?

这是我所拥有的伪代码示例:

class A() 

CustomASerializer extends com.esotericsoftware.kryo.Serializer[A]{
    override def write(kryo: Kryo, output: Output, a: A): Unit = ???
    override def read(kryo: Kryo, input: Input, t: Class[A]): A = ???
}

val kryo: Kryo = ... 
kryo.register(classOf[A], new CustomASerializer()); // I can register my serializer
Run Code Online (Sandbox Code Playgroud)

现在在Spark:

val sparkConf = new SparkConf()
sparkConf.registerKryoClasses(Array(classOf[A]))
Run Code Online (Sandbox Code Playgroud)

不幸的是,Spark没有给我选择注册我的自定义序列化程序.知道是否有办法做到这一点?

scala kryo apache-spark

11
推荐指数
1
解决办法
7728
查看次数

YARN:Spark中执行者数量和执行者核心之间有什么区别?

我在AWS EMR上学习Spark.在这个过程中,我试图理解执行者数量( - num-executors)和执行者核心(--executor-cores)之间的区别.请问有人请告诉我吗?

此外,当我试图提交以下工作时,我收到错误:

spark-submit --deploy-mode cluster --master yarn --num-executors 1 --executor-cores 5   --executor-memory 1g -–conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://test/spark-example/input/input.txt s3://test/spark-example/output21

Error: Unrecognized option: -–conf
Run Code Online (Sandbox Code Playgroud)

emr hadoop-yarn apache-spark

9
推荐指数
1
解决办法
4786
查看次数

Spark的Row和InternalRow类型之间的差异

目前Spark有两个Row实现:

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
Run Code Online (Sandbox Code Playgroud)

什么需要两个?它们是代表相同的编码实体,但一个在内部使用(内部API),另一个与外部API一起使用?

apache-spark spark-dataframe apache-spark-dataset

9
推荐指数
1
解决办法
475
查看次数

Scalatest和Spark给出"java.io.NotSerializableException:org.scalatest.Assertions $ AssertionsHelper"

我正在" com.holdenkarau.spark-testing-base "和scalatest的帮助下测试Spark Streaming应用程序.

import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }

class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {

  var delim: String = ","

  before {
    System.clearProperty("spark.driver.port")
   }

  test(“This Fails“) {

    val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
    val input = source.getLines.toList

    val rowRDDOut = Calculator.do(sc.parallelize(input))   //Returns DataFrame

    val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))

    source.close
  }
}
Run Code Online (Sandbox Code Playgroud)

我得到字段' delim '的序列化异常:

org.apache.spark.SparkException: Task not serializable
[info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info]   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info]   at …
Run Code Online (Sandbox Code Playgroud)

serialization scala scalatest apache-spark rdd

9
推荐指数
1
解决办法
2303
查看次数

Scala/Spark版本兼容性

我正在构建我的第一个spark应用程序.

http://spark.apache.org/downloads.html告诉我Spark 2.x是针对Scala 2.11构建的.

在Scala网站https://www.scala-lang.org/download/all.html我看到的版本来自2.11.0 - 2.11.11

所以这是我的问题:Spark网站上的2.11究竟是什么意思.它是2.11.0 - 2.11.11范围内的任何Scala版本吗?

另一个问题:我可以使用最新的Scala 2.12.2构建我的Spark应用程序吗?我假设Scala是向后兼容的,因此使用Scala构建的Spark库可以在Scala 2.12.1应用程序中使用/调用2.11.x.我对么?

scala apache-spark

9
推荐指数
1
解决办法
9577
查看次数