小编sca*_*ode的帖子

集群中作业运行期间的 Spark 流错误(纱线资源管理器)

我面临以下错误:
我编写了一个基于 Spark 流 ( Dstream )的应用程序来提取来自 PubSub 的消息。不幸的是,我在执行这项工作时遇到了错误。实际上,我正在使用由 4 个节点组成的集群来执行 spark 作业。

在没有任何特定错误的情况下运行作业 10 分钟后,我永久收到以下错误:

错误 org.apache.spark.streaming.CheckpointWriter:
无法将检查点任务提交给线程池执行程序 java.util.concurrent.RejectedExecutionException:任务 org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@68395dc9
从 java.util.concurrent被拒绝.ThreadPoolExecutor@1a1acc25
[正在运行,池大小 = 1,活动线程 = 1,排队任务 = 1000,已完成任务 = 412]

hadoop-yarn apache-spark spark-streaming dstream

6
推荐指数
0
解决办法
251
查看次数

MSCK REPAIR hive 外部表

我每天都会将数据摄入到 HDFS 中。从数据到 HDFS,我生成按日期分区的 Hive 外部表。我的问题如下,我是否应该在每次数据摄取后运行 MSCK REPAIR TABLE tablename ,在这种情况下我必须每天运行该命令。或者在表创建时运行一次就足够了。非常感谢你的回答

此致

hive

5
推荐指数
1
解决办法
1万
查看次数

Spark sql 的 from_json 返回空值

我将镶木地板文件加载到 spark 数据框中,如下所示:

val message= spark.read.parquet("gs://defenault-zdtt-devde/pubsub/part-00001-e9f8c58f-7de0-4537-a7be-a9a8556sede04a-c000.snappy.parquet")
Run Code Online (Sandbox Code Playgroud)

当我对我的数据框执行收集时,我得到以下结果:

message.collect()

Array[org.apache.spark.sql.Row] = Array([118738748835150,2018-08-20T17:44:38.742Z,{"id":"uplink-3130-85bc","device_id":60517119992794222,"group_id":69,"group":"box-2478-2555","profile_id":3,"profile":"eolane-movee","type":"uplink","timestamp":"2018-08-20T17:44:37.048Z","count":3130,"payload":[{"timestamp":"2018-08-20T17:44:37.048Z","data":{"battery":3.5975599999999996,"temperature":27}}],"payload_encrypted":"9da25e36","payload_cleartext":"fe1b01aa","device_properties":{"appeui":"7ca97df000001190","deveui":"7ca97d0000001bb0","external_id":"Product: 3.7 / HW: 3.1 / SW: 1.8.8","no_de_serie_eolane":"4904","no_emballage":"S02066","product_version":"1.3.1"},"protocol_data":{"AppNonce":"e820ef","DevAddr":"0e6c5fda","DevNonce":"85bc","NetID":"000007","best_gateway_id":"M40246","gateway.
Run Code Online (Sandbox Code Playgroud)

此数据框的架构是

message.printSchema()
root


 |-- Id: string (nullable = true)
 |-- publishTime: string (nullable = true)
 |-- data: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我的目标是处理包含 json 数据的数据列并将其展平。我写了以下代码

val schemaTotal = new StructType (
Array (StructField("id",StringType,false),StructField("device_id",StringType),StructField("group_id",LongType), StructField("group",StringType),StructField("profile_id",IntegerType),StructField("profile",StringType),StructField("type",StringType),StructField("timestamp",StringType),
StructField("count",StringType),
StructField("payload",new StructType ()
.add("timestamp",StringType)
.add("data",new ArrayType (new StructType().add("battery",LongType).add("temperature",LongType),false))),
StructField("payload_encrypted",StringType),
StructField("payload_cleartext",StringType),
StructField("device_properties", new ArrayType (new StructType().add("appeui",StringType).add("deveui",StringType).add("external_id",StringType).add("no_de_serie_eolane",LongType).add("no_emballage",StringType).add("product_version",StringType),false)),
StructField("protocol_data", new ArrayType (new StructType().add("AppNonce",StringType).add("DevAddr",StringType).add("DevNonce",StringType).add("NetID",LongType).add("best_gateway_id",StringType).add("gateways",IntegerType).add("lora_version",IntegerType).add("noise",LongType).add("port",IntegerType).add("rssi",DoubleType).add("sf",IntegerType).add("signal",DoubleType).add("snr",DoubleType),false)),
StructField("lat",StringType),
StructField("lng",StringType),
StructField("geolocation_type",StringType),
StructField("geolocation_precision",StringType),
StructField("delivered_at",StringType)))


val dataframe_extract=message.select($"Id",
$"publishTime", …
Run Code Online (Sandbox Code Playgroud)

json apache-spark-sql

3
推荐指数
1
解决办法
3842
查看次数

否则数据帧火花的情况

我写了这个:

val result = df.withColumn("Ind", when($"color" === "Green", 1).otherwise(0))
Run Code Online (Sandbox Code Playgroud)

我想将条件扩展$"color" === "Green"$"color" in ["GREEN", "RED", "YELLOW"]

知道该怎么做吗?

scala apache-spark

2
推荐指数
1
解决办法
2381
查看次数

在Scala函数中按名称调用参数

我对这两个功能之间的区别有疑问:

def getFunction(checkpointPath: String,
                  sparkConf: SparkConf,
                  creatingFunc: () => StreamingContext): StreamingContext = {
    function body
  }


def getFunction(checkpointPath: String,
                  sparkConf: SparkConf,
                  creatingFunc:  => StreamingContext): StreamingContext = {
    function body
  }
Run Code Online (Sandbox Code Playgroud)

因此,按名称调用的参数是相同的:

creatingFunc:  => StreamingContext 
Run Code Online (Sandbox Code Playgroud)

creatingFunc: () => StreamingContext
Run Code Online (Sandbox Code Playgroud)

或者没有 ?

scala

2
推荐指数
1
解决办法
55
查看次数

带有 Spark 流的多个 writeStream

我正在使用 Spark Streaming,在尝试实现多个写入流时遇到一些问题。下面是我的代码

DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
Run Code Online (Sandbox Code Playgroud)

其中 writeStreamer 定义如下:

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {

  val query = input
                .writeStream
                .format("orc")
                .option("checkpointLocation", checkPointFolder)
                .option("path", output)
                .outputMode(OutputMode.Append)
                .start()

  query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)

我面临的问题是只有第一个表是用 Spark writeStream 写入的,所有其他表都没有发生任何情况。请问您对此有什么想法吗?

apache-spark spark-structured-streaming

1
推荐指数
1
解决办法
6324
查看次数

从scala中的String计算日期

您好我想使用LocalDateTime JAVA API将这些类型的字符串转换为日期.下面是输入字符串:"1672017",我想根据2017年的数字为167的日期转换为相应的日期.

请知道如何做到这一点

java datetime scala date java-8

1
推荐指数
1
解决办法
77
查看次数

使用intellij在本地运行spark

我写了这个:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object ProcessingApp extends App {
  val sparkConf = new SparkConf()
    .setAppName("er")
    .setMaster("local")
  val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

  val test = sparkSession.version

  println(test)

}
Run Code Online (Sandbox Code Playgroud)

我想通过右键单击运行 ProcessingApp 在我的 Intellij IDE 本地运行它,但这不起作用,我没有在 build.sbt 文件级别提供我的 spark 依赖项。我收到此错误:

Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

1
推荐指数
1
解决办法
2437
查看次数

改善scala状况

我这样写:

if (fork == "0" || fork == "1" || fork == "3" || fork == "null"  ) {
 list2 :: List(
    Wrapper(
      Location.PL_TYPES,
      subType,
      daFuncId,
      NA,
      name,
      code)
  )
}
else list2  :: List(
  Wrapper(
    Location.PL_TYPES,
    subType,
    NA,
    NA,
    name,
    code
  )
)

}
Run Code Online (Sandbox Code Playgroud)

我想通过将if else替换为另一种模式来改善这一点

最好的祝福

scala

0
推荐指数
1
解决办法
80
查看次数