我面临以下错误:
我编写了一个基于 Spark 流 ( Dstream )的应用程序来提取来自 PubSub 的消息。不幸的是,我在执行这项工作时遇到了错误。实际上,我正在使用由 4 个节点组成的集群来执行 spark 作业。
在没有任何特定错误的情况下运行作业 10 分钟后,我永久收到以下错误:
错误 org.apache.spark.streaming.CheckpointWriter:
无法将检查点任务提交给线程池执行程序 java.util.concurrent.RejectedExecutionException:任务 org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@68395dc9
从 java.util.concurrent被拒绝.ThreadPoolExecutor@1a1acc25
[正在运行,池大小 = 1,活动线程 = 1,排队任务 = 1000,已完成任务 = 412]
我每天都会将数据摄入到 HDFS 中。从数据到 HDFS,我生成按日期分区的 Hive 外部表。我的问题如下,我是否应该在每次数据摄取后运行 MSCK REPAIR TABLE tablename ,在这种情况下我必须每天运行该命令。或者在表创建时运行一次就足够了。非常感谢你的回答
此致
我将镶木地板文件加载到 spark 数据框中,如下所示:
val message= spark.read.parquet("gs://defenault-zdtt-devde/pubsub/part-00001-e9f8c58f-7de0-4537-a7be-a9a8556sede04a-c000.snappy.parquet")
Run Code Online (Sandbox Code Playgroud)
当我对我的数据框执行收集时,我得到以下结果:
message.collect()
Array[org.apache.spark.sql.Row] = Array([118738748835150,2018-08-20T17:44:38.742Z,{"id":"uplink-3130-85bc","device_id":60517119992794222,"group_id":69,"group":"box-2478-2555","profile_id":3,"profile":"eolane-movee","type":"uplink","timestamp":"2018-08-20T17:44:37.048Z","count":3130,"payload":[{"timestamp":"2018-08-20T17:44:37.048Z","data":{"battery":3.5975599999999996,"temperature":27}}],"payload_encrypted":"9da25e36","payload_cleartext":"fe1b01aa","device_properties":{"appeui":"7ca97df000001190","deveui":"7ca97d0000001bb0","external_id":"Product: 3.7 / HW: 3.1 / SW: 1.8.8","no_de_serie_eolane":"4904","no_emballage":"S02066","product_version":"1.3.1"},"protocol_data":{"AppNonce":"e820ef","DevAddr":"0e6c5fda","DevNonce":"85bc","NetID":"000007","best_gateway_id":"M40246","gateway.
Run Code Online (Sandbox Code Playgroud)
此数据框的架构是
message.printSchema()
root
|-- Id: string (nullable = true)
|-- publishTime: string (nullable = true)
|-- data: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我的目标是处理包含 json 数据的数据列并将其展平。我写了以下代码
val schemaTotal = new StructType (
Array (StructField("id",StringType,false),StructField("device_id",StringType),StructField("group_id",LongType), StructField("group",StringType),StructField("profile_id",IntegerType),StructField("profile",StringType),StructField("type",StringType),StructField("timestamp",StringType),
StructField("count",StringType),
StructField("payload",new StructType ()
.add("timestamp",StringType)
.add("data",new ArrayType (new StructType().add("battery",LongType).add("temperature",LongType),false))),
StructField("payload_encrypted",StringType),
StructField("payload_cleartext",StringType),
StructField("device_properties", new ArrayType (new StructType().add("appeui",StringType).add("deveui",StringType).add("external_id",StringType).add("no_de_serie_eolane",LongType).add("no_emballage",StringType).add("product_version",StringType),false)),
StructField("protocol_data", new ArrayType (new StructType().add("AppNonce",StringType).add("DevAddr",StringType).add("DevNonce",StringType).add("NetID",LongType).add("best_gateway_id",StringType).add("gateways",IntegerType).add("lora_version",IntegerType).add("noise",LongType).add("port",IntegerType).add("rssi",DoubleType).add("sf",IntegerType).add("signal",DoubleType).add("snr",DoubleType),false)),
StructField("lat",StringType),
StructField("lng",StringType),
StructField("geolocation_type",StringType),
StructField("geolocation_precision",StringType),
StructField("delivered_at",StringType)))
val dataframe_extract=message.select($"Id",
$"publishTime", …Run Code Online (Sandbox Code Playgroud) 我写了这个:
val result = df.withColumn("Ind", when($"color" === "Green", 1).otherwise(0))
Run Code Online (Sandbox Code Playgroud)
我想将条件扩展$"color" === "Green"到$"color" in ["GREEN", "RED", "YELLOW"]
知道该怎么做吗?
我对这两个功能之间的区别有疑问:
def getFunction(checkpointPath: String,
sparkConf: SparkConf,
creatingFunc: () => StreamingContext): StreamingContext = {
function body
}
def getFunction(checkpointPath: String,
sparkConf: SparkConf,
creatingFunc: => StreamingContext): StreamingContext = {
function body
}
Run Code Online (Sandbox Code Playgroud)
因此,按名称调用的参数是相同的:
creatingFunc: => StreamingContext
Run Code Online (Sandbox Code Playgroud)
和
creatingFunc: () => StreamingContext
Run Code Online (Sandbox Code Playgroud)
或者没有 ?
我正在使用 Spark Streaming,在尝试实现多个写入流时遇到一些问题。下面是我的代码
DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
Run Code Online (Sandbox Code Playgroud)
其中 writeStreamer 定义如下:
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {
val query = input
.writeStream
.format("orc")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.start()
query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)
我面临的问题是只有第一个表是用 Spark writeStream 写入的,所有其他表都没有发生任何情况。请问您对此有什么想法吗?
您好我想使用LocalDateTime JAVA API将这些类型的字符串转换为日期.下面是输入字符串:"1672017",我想根据2017年的数字为167的日期转换为相应的日期.
请知道如何做到这一点
我写了这个:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object ProcessingApp extends App {
val sparkConf = new SparkConf()
.setAppName("er")
.setMaster("local")
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val test = sparkSession.version
println(test)
}
Run Code Online (Sandbox Code Playgroud)
我想通过右键单击运行 ProcessingApp 在我的 Intellij IDE 本地运行它,但这不起作用,我没有在 build.sbt 文件级别提供我的 spark 依赖项。我收到此错误:
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass
Run Code Online (Sandbox Code Playgroud) 我这样写:
if (fork == "0" || fork == "1" || fork == "3" || fork == "null" ) {
list2 :: List(
Wrapper(
Location.PL_TYPES,
subType,
daFuncId,
NA,
name,
code)
)
}
else list2 :: List(
Wrapper(
Location.PL_TYPES,
subType,
NA,
NA,
name,
code
)
)
}
Run Code Online (Sandbox Code Playgroud)
我想通过将if else替换为另一种模式来改善这一点
最好的祝福
scala ×5
apache-spark ×4
date ×1
datetime ×1
dstream ×1
hadoop-yarn ×1
hive ×1
java ×1
java-8 ×1
json ×1