我有远程Redis服务器的URL和PORT.我能够从Scala写入Redis.但是我想通过使用终端连接到远程的Redis redis-server类似的东西,或者为了使好几拨hget,get等等(我可以与本地安装的Redis做没有任何问题).
我想保存RDD为镶木地板文件.为此,我将RDD传递给DataFrame然后使用结构保存DataFrame为镶木地板文件:
val aStruct = new StructType(Array(StructField("id",StringType,nullable = true),
StructField("role",StringType,nullable = true)))
val newDF = sqlContext.createDataFrame(filtered, aStruct)
Run Code Online (Sandbox Code Playgroud)
问题是如何aStruct为所有列自动创建假设所有列都是StringType?另外,是什么意思nullable = true?这是否意味着所有空值都将被替换Null?
我按照本教程和其他有关任务序列化的类似教程,但我的代码失败并出现Task serialization错误.我不明白为什么会这样.我在topicOutputMessages外面设置变量foreachRDD,然后我在里面读它foreachPartition.我也创建了KafkaProducerINSIDE foreachPartition.那么,这里的问题是什么?真的不能说明问题.
al topicsSet = topicInputMessages.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
messages.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
UtilsDM.setMetadataBrokerList(metadataBrokerList)
UtilsDM.setOutputTopic(topicOutputMessages)
val producer = UtilsDM.createProducer
iter.foreach { msg =>
val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg)
producer.send(record)
}
producer.close()
}
})
Run Code Online (Sandbox Code Playgroud)
编辑:
object UtilsDM extends Serializable {
var topicOutputMessages: String = ""
var metadataBrokerList: String = ""
var …Run Code Online (Sandbox Code Playgroud) 当我在集群中执行我的Spark代码时,该行会info.foreachRDD(rdd => if (!rdd.isEmpty()) rdd.foreach(a => println(a._1)))生成一条错误消息(参见下文).我做错了什么,错误信息表明了什么?
val info = dstreamdata.map[(String, (Long, Long, Long, List[String]))](a => {
(a._1, (a._2._1, a._2._2, 1, a._2._3))
}).
reduceByKey((a, b) => {
(Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3, a._4 ++ b._4)
}).
updateStateByKey(Utils.updateState)
info.foreachRDD(rdd => if (!rdd.isEmpty()) rdd.foreach(a => println(a._1)))
Run Code Online (Sandbox Code Playgroud)
错误堆栈跟踪:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at …Run Code Online (Sandbox Code Playgroud) 我有以下配置文件,我想从Scala应用此库使用:
P11 {
yes="0.0"
no="1.0"
}
P12 {
yes="0.01"
no="0.99"
}
P13 {
id = "123 567 \
T 0: \
If (f 23 <= 0.0)"
}
Run Code Online (Sandbox Code Playgroud)
我是这样做的:
import com.typesafe.config.ConfigFactory
val configFileName = "/usr/develop/tests/config.conf"
val parsedConfigMCF = ConfigFactory.parseFile(new File(configFileName))
val confMCF = ConfigFactory.load(parsedConfigMCF)
Run Code Online (Sandbox Code Playgroud)
然后我得到错误:
Expecting a value but got wrong token: 'newline' (backslash followed by 'newline', this is not a valid escape sequence
它看起来像它不喜欢\(反斜线),但我需要把一些线id在P13.
我编写了以下代码,以便将SQL DataFrame转换df为RDD[LabeledPoint]:
val targetInd = df.columns.indexOf("myTarget")
val ignored = List("myTarget")
val featInd = df.columns.diff(ignored).map(df.columns.indexOf(_))
df.printSchema
val dfLP = df.rdd.map(r => LabeledPoint(
r.getDouble(targetInd),
Vectors.dense(featInd.map(r.getDouble(_)).toArray)
))
Run Code Online (Sandbox Code Playgroud)
模式如下所示:
root
|-- myTarget: long (nullable = true)
|-- var1: long (nullable = true)
|-- var2: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
当我运行时dfLP.foreach(l => l.label),会发生以下错误:
java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Double
如何将标签转换成两倍?我希望其他功能可能都是两倍或更长,不是吗?如果不正确,那么我还需要将其余功能转换成两倍。
我想在里面使用SparkContext和SQLContext foreachPartition,但由于序列化错误而无法执行此操作.我知道这两个对象都不是可序列化的,但我认为它foreachPartition是在master上执行的,其中Spark Context和SQLContext都可用.
符号:
`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`
Run Code Online (Sandbox Code Playgroud)
这是我当前的代码(UtilsDM是一个对象extends Serializable).代码失败的部分开始从val schema =...,在那里我想写result的DataFrame,然后将其保存到拼花地板.也许我组织代码的方式效率低下,那么我想在此提出您的建议.谢谢.
// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
df = sqlContext
.read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)
// Here I process myDStream
myDStream.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val r = new RedisClient(UtilsDM.getHost, …Run Code Online (Sandbox Code Playgroud)