我有一个Spark Streaming作业与其他作业(Spark核心作业)一起在我们的集群上运行。我想对包括Spark Streaming在内的这些作业使用动态资源分配。根据下面的JIRA问题,不支持动态分配Spark Streaming(在1.6.1版本中)。但已在2.0.0中修复
根据本期的PDF,它说应该有一个名为“ spark.streaming.dynamicAllocation.enabled=true
但是我在文档中没有看到此配置” 的配置字段
。
任何人都可以确认,
spark.streaming.dynamicAllocation.enabled=true
或spark.dynamicAllocation.enabled=true
)dynamic-allocation apache-spark spark-streaming apache-spark-1.6 apache-spark-2.0
我使用Spark 2.0.我的数据WrappedArray
框列中包含一个Florap的WrappedArrays.
一行的示例是:
[[1.0 2.0 2.0][6.0 5.0 2.0][4.0 2.0 3.0]]
Run Code Online (Sandbox Code Playgroud)
我试图将这个专栏改造成一个Array[Array[Float]]
.
到目前为止我尝试的是以下内容:
dataframe.select("mycolumn").rdd.map(r => r.asInstanceOf[Array[Array[Float]]])
Run Code Online (Sandbox Code Playgroud)
但是我收到以下错误:
Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to [[F
Run Code Online (Sandbox Code Playgroud)
任何想法都将受到高度赞赏.谢谢
我在由三个节点组成的集群上运行一个火花流应用程序,每个节点都有一个worker和三个执行器(所以总共有9个执行器).我正在使用spark独立模式(版本2.1.1).
应用程序使用带选项--deploy-mode client
和的spark-submit命令运行--conf spark.streaming.stopGracefullyOnShutdown=true
.submit命令从其中一个节点运行,我们称之为节点1.
作为容错测试,我通过调用脚本来停止节点2上的worker stop-slave.sh
.
在节点2上的执行程序日志中,我可以看到在shuffle操作期间与FileNotFoundException相关的几个错误:
ERROR Executor: Exception in task 5.0 in stage 5531241.0 (TID 62488319)
java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.ecb8e397-c3a3-4c1a-96ba-e153ed92b05c (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:206)
at java.io.FileOutputStream.<init>(FileOutputStream.java:156)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我可以在节点2上的3个执行程序中的每个执行程序中看到同一任务中的4种此类错误.
在驱动程序日志中我可以看到:
ERROR TaskSetManager: Task 5 in stage 5531241.0 failed 4 times; aborting job
...
ERROR JobScheduler: Error running job streaming job 1503995015000 ms.1
org.apache.spark.SparkException: …
Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming apache-spark-standalone apache-spark-2.0
我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。
我在应用程序启动时从检查点进行流式查询恢复期间遇到问题。由于有多个流查询源自单个 kafka 流点,并且每个流查询都有不同的 checkpint 目录。因此,如果作业失败,当我们重新启动作业时,会有一些流查询无法从检查点位置恢复,因此会抛出 Error Reading Delta file异常。这是日志:
Job aborted due to stage failure: Task 2 in stage 13.0 failed 4 times, most recent failure: Lost task 2.3 in stage 13.0 (TID 831, ip-172-31-10-246.us-west-2.compute.internal, executor 3): java.lang.IllegalStateException: Error reading delta file /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /checkpointing/wifiHealthPerUserPerMinute/state/0/2]: /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta does not exist
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360) …
Run Code Online (Sandbox Code Playgroud) checkpoint apache-kafka apache-spark-2.0 spark-structured-streaming
我正在使用Spark 2.2,我正在尝试从Kafka读取JSON消息,将它们转换为DataFrame
并将它们作为Row
:
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic")
.load()
.select(col("value").cast(StringType).as("col"))
.writeStream()
.format("console")
.start();
Run Code Online (Sandbox Code Playgroud)
有了这个,我可以实现:
+--------------------+
| col|
+--------------------+
|{"myField":"somet...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
我想要更像这样的东西:
+--------------------+
| myField|
+--------------------+
|"something" |
+--------------------+
Run Code Online (Sandbox Code Playgroud)
我尝试使用from_json
函数struct
:
DataTypes.createStructType(
new StructField[] {
DataTypes.createStructField("myField", DataTypes.StringType)
}
)
Run Code Online (Sandbox Code Playgroud)
但我只得到:
+--------------------+
| jsontostructs(col)|
+--------------------+
|[something] |
+--------------------+
Run Code Online (Sandbox Code Playgroud)
然后我尝试使用,explode
但我只有Exception说:
cannot resolve 'explode(`col`)' due to data type mismatch:
input to function explode should be array or map type, not
StructType(StructField(...
Run Code Online (Sandbox Code Playgroud)
知道如何使这项工作?
java apache-spark apache-spark-sql apache-spark-2.0 spark-structured-streaming
我有两个火花流,第一个是与产品相关的数据:它们对供应商的价格,货币,它们的描述,供应商ID.这些数据由类别丰富,通过对描述的分析和以美元计算的价格来猜测.然后将它们保存在镶木地板数据集中.
第二个流包含拍卖这些产品的数据,然后是销售成本和日期.
鉴于产品今天可以到达第一个流并且在一年内出售,我如何将第二个流加入第一个流的镶木地板数据集中包含的所有历史记录?
结果应该是每个价格范围的平均每日收益......
apache-spark amazon-kinesis spark-streaming pyspark apache-spark-2.0
Spark 版本:2.3 hadoop 发行版:azure Hdinsight 2.6.5 平台:Azure 存储:BLOB
集群中的节点:6 个执行程序实例:每个执行程序 6 个核心:3 每个执行程序内存:8GB
尝试通过同一存储帐户上的 Spark 数据帧将 azure blob (wasb) 中的 csv 文件(大小 4.5g - 280 col,280 万行)加载为 parquet 格式。我已将文件重新分区为不同大小,即 20、40、60、100,但遇到一个奇怪的问题,即处理非常小的记录子集(< 1%)的 6 个执行程序中的 2 个继续运行 1 小时左右并最终完成。
问题 :
1) 这 2 个执行程序正在处理的分区需要处理的记录最少(少于 1%),但需要近一个小时才能完成。这是什么原因。这与数据倾斜场景相反吗?
2) 运行这些执行程序的节点上的本地缓存文件夹已被填满 (50-60GB)。不确定这背后的原因。
3) 增加分区确实会将整体执行时间降低到 40 分钟,但只想知道这 2 个执行器的低执行时间背后的原因。
Spark 新手,因此期待一些调整此工作负载的指导。附加来自 Spark WebUi 的附加信息。
performance apache-spark parquet apache-spark-sql apache-spark-2.0
我正在使用 Java 在 Spark 中运行以下代码。
代码
测试.java
package com.sample;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;
import com.addition.AddTwoNumbers;
public class Test{
private static final String APP_NAME = "Test";
private static final String LOCAL = "local";
private static final String MASTER_IP = "spark://10.180.181.26:7077";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(APP_NAME).setMaster(MASTER_IP);
String connection = "jdbc:oracle:thin:test/test@//xyz00aie.in.oracle.com:1521/PDX2600N";
// Create Spark Context
SparkContext context = new SparkContext(conf);
// Create Spark Session
SparkSession sparkSession = …
Run Code Online (Sandbox Code Playgroud) 我有多个大数据帧(大约 30GB)称为 as 和 bs,一个相对较小的数据帧(大约 500MB ~ 1GB)称为 spp。我试图将 spp 缓存到内存中,以避免多次从数据库或文件中读取数据。
但是我发现如果我缓存 spp,物理计划显示它不会使用广播连接,即使 spp 被广播功能包围。但是,如果我取消持久化 spp,计划会显示它使用广播连接。
有熟悉这个的吗?
scala> spp.cache
res38: spp.type = [id: bigint, idPartner: int ... 41 more fields]
scala> val as = acs.join(broadcast(spp), $"idsegment" === $"idAdnetProductSegment")
as: org.apache.spark.sql.DataFrame = [idsegmentpartner: bigint, ssegmentsource: string ... 44 more fields]
scala> as.explain
== Physical Plan ==
*SortMergeJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner
:- *Sort [idsegment#286L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(idsegment#286L, 200)
: +- *Filter isnotnull(idsegment#286L)
: +- HiveTableScan [idsegmentpartner#282L, …
Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
尝试使用架构合并加载镶木地板文件时
df = spark.read.option("mergeSchema", "true").parquet('some_path/partition_date')
df.show()
Run Code Online (Sandbox Code Playgroud)
我收到以下异常:
Py4JJavaError: An error occurred while calling o421.parquet.
: org.apache.spark.SparkException: Failed merging schema:
root
....
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$mergeSchemasInParallel$1.apply(ParquetFileFormat.scala:643)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$mergeSchemasInParallel$1.apply(ParquetFileFormat.scala:639)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:639)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:643)
at sun.reflect.GeneratedMethodAccessor137.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to merge fields 'some_field_name' and 'some_field_name'. …
Run Code Online (Sandbox Code Playgroud) apache-spark-2.0 ×10
apache-spark ×8
parquet ×2
pyspark ×2
apache-kafka ×1
arrays ×1
casting ×1
checkpoint ×1
java ×1
performance ×1
python ×1
scala ×1
spark-submit ×1