小编duc*_*ito的帖子

如何连接到远程Redis服务器?

我有远程Redis服务器的URL和PORT.我能够从Scala写入Redis.但是我想通过使用终端连接到远程的Redis redis-server类似的东西,或者为了使好几拨hget,get等等(我可以与本地安装的Redis做没有任何问题).

redis

79
推荐指数
4
解决办法
11万
查看次数

如何自动化StructType创建以将RDD传递给DataFrame

我想保存RDD为镶木地板文件.为此,我将RDD传递给DataFrame然后使用结构保存DataFrame为镶木地板文件:

    val aStruct = new StructType(Array(StructField("id",StringType,nullable = true),
                                       StructField("role",StringType,nullable = true)))
    val newDF = sqlContext.createDataFrame(filtered, aStruct)
Run Code Online (Sandbox Code Playgroud)

问题是如何aStruct为所有列自动创建假设所有列都是StringType?另外,是什么意思nullable = true?这是否意味着所有空值都将被替换Null

scala apache-spark rdd spark-dataframe

6
推荐指数
1
解决办法
545
查看次数

serializable对象的用法:引起:java.io.NotSerializableException

我按照本教程和其他有关任务序列化的类似教程,但我的代码失败并出现Task serialization错误.我不明白为什么会这样.我在topicOutputMessages外面设置变量foreachRDD,然后我在里面读它foreachPartition.我也创建了KafkaProducerINSIDE foreachPartition.那么,这里的问题是什么?真的不能说明问题.

al topicsSet = topicInputMessages.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)


messages.foreachRDD(rdd => {
    rdd.foreachPartition{iter =>
        UtilsDM.setMetadataBrokerList(metadataBrokerList)
        UtilsDM.setOutputTopic(topicOutputMessages)
        val producer = UtilsDM.createProducer
        iter.foreach { msg =>
              val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg)
              producer.send(record)
        }
        producer.close()
    }
})
Run Code Online (Sandbox Code Playgroud)

编辑:

object UtilsDM extends Serializable {

  var topicOutputMessages: String = ""
  var metadataBrokerList: String = ""
  var …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

6
推荐指数
1
解决办法
1047
查看次数

为什么启动我的Spark Streaming应用程序会给出"容器以非零退出代码50退出"?

当我在集群中执行我的Spark代码时,该行会info.foreachRDD(rdd => if (!rdd.isEmpty()) rdd.foreach(a => println(a._1)))生成一条错误消息(参见下文).我做错了什么,错误信息表明了什么?

val info = dstreamdata.map[(String, (Long, Long, Long, List[String]))](a => {
      (a._1, (a._2._1, a._2._2, 1, a._2._3))
    }).
      reduceByKey((a, b) => {
        (Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3, a._4 ++ b._4)
      }).
      updateStateByKey(Utils.updateState)

    info.foreachRDD(rdd => if (!rdd.isEmpty()) rdd.foreach(a => println(a._1)))
Run Code Online (Sandbox Code Playgroud)

错误堆栈跟踪:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at …
Run Code Online (Sandbox Code Playgroud)

scala hadoop-yarn apache-spark spark-streaming

6
推荐指数
1
解决办法
1400
查看次数

如何在Typesafe Config中指定多行字符串?

我有以下配置文件,我想从Scala应用此库使用:

P11 {
    yes="0.0"
    no="1.0"
}

P12 {
    yes="0.01"
    no="0.99"
}

P13 {
id = "123 567 \
T 0: \
If (f 23 <= 0.0)"
}
Run Code Online (Sandbox Code Playgroud)

我是这样做的:

import com.typesafe.config.ConfigFactory

val configFileName = "/usr/develop/tests/config.conf"
val parsedConfigMCF = ConfigFactory.parseFile(new File(configFileName))
val confMCF = ConfigFactory.load(parsedConfigMCF)
Run Code Online (Sandbox Code Playgroud)

然后我得到错误:

Expecting a value but got wrong token: 'newline' (backslash followed by 'newline', this is not a valid escape sequence

它看起来像它不喜欢\(反斜线),但我需要把一些线idP13.

typesafe-config

5
推荐指数
1
解决办法
3013
查看次数

创建RDD [LabeledPoint]:java.lang.ClassCastException:无法将java.lang.Long强制转换为java.lang.Double

我编写了以下代码,以便将SQL DataFrame转换dfRDD[LabeledPoint]

val targetInd = df.columns.indexOf("myTarget")
val ignored = List("myTarget")
val featInd = df.columns.diff(ignored).map(df.columns.indexOf(_))

df.printSchema

val dfLP = df.rdd.map(r => LabeledPoint(
  r.getDouble(targetInd),
  Vectors.dense(featInd.map(r.getDouble(_)).toArray)
))
Run Code Online (Sandbox Code Playgroud)

模式如下所示:

root
 |-- myTarget: long (nullable = true)
 |-- var1: long (nullable = true)
 |-- var2: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

当我运行时dfLP.foreach(l => l.label),会发生以下错误:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Double

如何将标签转换成两倍?我希望其他功能可能都是两倍或更长,不是吗?如果不正确,那么我还需要将其余功能转换成两倍。

scala apache-spark

4
推荐指数
1
解决办法
942
查看次数

如何在foreachPartition中使用SQLContext和SparkContext

我想在里面使用SparkContext和SQLContext foreachPartition,但由于序列化错误而无法执行此操作.我知道这两个对象都不是可序列化的,但我认为它foreachPartition是在master上执行的,其中Spark Context和SQLContext都可用.

符号:

`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`
Run Code Online (Sandbox Code Playgroud)

这是我当前的代码(UtilsDM是一个对象extends Serializable).代码失败的部分开始从val schema =...,在那里我想写resultDataFrame,然后将其保存到拼花地板.也许我组织代码的方式效率低下,那么我想在此提出您的建议.谢谢.

// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
  df = sqlContext
    .read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)

// Here I process myDStream
myDStream.foreachRDD(rdd => {
  rdd.foreachPartition{iter =>
    val r = new RedisClient(UtilsDM.getHost, …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

3
推荐指数
1
解决办法
4561
查看次数