标签: apache-spark-2.0

Spark流的动态分配

我有一个Spark Streaming作业与其他作业(Spark核心作业)一起在我们的集群上运行。我想对包括Spark Streaming在内的这些作业使用动态资源分配。根据下面的JIRA问题,不支持动态分配Spark Streaming(在1.6.1版本中)。但已在2.0.0中修复

JIRA链接

根据本期的PDF,它说应该有一个名为“ spark.streaming.dynamicAllocation.enabled=true 但是我在文档中没有看到此配置” 的配置字段 。

任何人都可以确认,

  1. 我无法在1.6.1版本中为Spark Streaming启用动态资源分配。
  2. 它在Spark 2.0.0中可用吗?如果是,应设置什么配置(spark.streaming.dynamicAllocation.enabled=truespark.dynamicAllocation.enabled=true

dynamic-allocation apache-spark spark-streaming apache-spark-1.6 apache-spark-2.0

6
推荐指数
1
解决办法
2735
查看次数

如何在spark(scala)中将WrappedArray [WrappedArray [Float]]转换为Array [Array [Float]]

我使用Spark 2.0.我的数据WrappedArray框列中包含一个Florap的WrappedArrays.

一行的示例是:

[[1.0 2.0 2.0][6.0 5.0 2.0][4.0 2.0 3.0]]
Run Code Online (Sandbox Code Playgroud)

我试图将这个专栏改造成一个Array[Array[Float]].

到目前为止我尝试的是以下内容:

dataframe.select("mycolumn").rdd.map(r => r.asInstanceOf[Array[Array[Float]]])
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误:

Caused by: java.lang.ClassCastException:
 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to [[F
Run Code Online (Sandbox Code Playgroud)

任何想法都将受到高度赞赏.谢谢

arrays casting scala spark-dataframe apache-spark-2.0

6
推荐指数
1
解决办法
9719
查看次数

Spark Streaming - 已停止的工作程序抛出FileNotFoundException

我在由三个节点组成的集群上运行一个火花流应用程序,每个节点都有一个worker和三个执行器(所以总共有9个执行器).我正在使用spark独立模式(版本2.1.1).

应用程序使用带选项--deploy-mode client和的spark-submit命令运行--conf spark.streaming.stopGracefullyOnShutdown=true.submit命令从其中一个节点运行,我们称之为节点1.

作为容错测试,我通过调用脚本来停止节点2上的worker stop-slave.sh.

在节点2上的执行程序日志中,我可以看到在shuffle操作期间与FileNotFoundException相关的几个错误:

ERROR Executor: Exception in task 5.0 in stage 5531241.0 (TID 62488319)
java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.ecb8e397-c3a3-4c1a-96ba-e153ed92b05c (No such file or directory)
    at java.io.FileOutputStream.open(Native Method)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:206)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:156)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我可以在节点2上的3个执行程序中的每个执行程序中看到同一任务中的4种此类错误.

在驱动程序日志中我可以看到:

ERROR TaskSetManager: Task 5 in stage 5531241.0 failed 4 times; aborting job
 ...
ERROR JobScheduler: Error running job streaming job 1503995015000 ms.1
org.apache.spark.SparkException: …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming apache-spark-standalone apache-spark-2.0

6
推荐指数
0
解决办法
459
查看次数

java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理

我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。

我在应用程序启动时从检查点进行流式查询恢复期间遇到问题。由于有多个流查询源自单个 kafka 流点,并且每个流查询都有不同的 checkpint 目录。因此,如果作业失败,当我们重新启动作业时,会有一些流查询无法从检查点位置恢复,因此会抛出 Error Reading Delta file异常。这是日志:


Job aborted due to stage failure: Task 2 in stage 13.0 failed 4 times, most recent failure: Lost task 2.3 in stage 13.0 (TID 831, ip-172-31-10-246.us-west-2.compute.internal, executor 3): java.lang.IllegalStateException: Error reading delta file /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /checkpointing/wifiHealthPerUserPerMinute/state/0/2]: /checkpointing/wifiHealthPerUserPerMinute/state/0/2/1.delta does not exist
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:410)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:362)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:359)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:359)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:358)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:358)
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:360) …
Run Code Online (Sandbox Code Playgroud)

checkpoint apache-kafka apache-spark-2.0 spark-structured-streaming

6
推荐指数
1
解决办法
4021
查看次数

jsontostructs to spark in spark结构化流媒体

我正在使用Spark 2.2,我正在尝试从Kafka读取JSON消息,将它们转换为DataFrame并将它们作为Row:

spark
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic")
    .load()
    .select(col("value").cast(StringType).as("col"))
    .writeStream()
    .format("console")
    .start();
Run Code Online (Sandbox Code Playgroud)

有了这个,我可以实现:

+--------------------+
|                 col|
+--------------------+
|{"myField":"somet...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

我想要更像这样的东西:

+--------------------+
|             myField|
+--------------------+
|"something"         |
+--------------------+
Run Code Online (Sandbox Code Playgroud)

我尝试使用from_json函数struct:

DataTypes.createStructType(
    new StructField[] {
            DataTypes.createStructField("myField", DataTypes.StringType)
    }
)
Run Code Online (Sandbox Code Playgroud)

但我只得到:

+--------------------+
|  jsontostructs(col)|
+--------------------+
|[something]         |
+--------------------+
Run Code Online (Sandbox Code Playgroud)

然后我尝试使用,explode但我只有Exception说:

cannot resolve 'explode(`col`)' due to data type mismatch: 
input to function explode should be array or map type, not 
StructType(StructField(...
Run Code Online (Sandbox Code Playgroud)

知道如何使这项工作?

java apache-spark apache-spark-sql apache-spark-2.0 spark-structured-streaming

6
推荐指数
1
解决办法
1633
查看次数

如何在其整个生命周期中加入另一个流收集的所有数据的火花实时流?

我有两个火花流,第一个是与产品相关的数据:它们对供应商的价格,货币,它们的描述,供应商ID.这些数据由类别丰富,通过对描述的分析和以美元计算的价格来猜测.然后将它们保存在镶木地板数据集中.

第二个流包含拍卖这些产品的数据,然后是销售成本和日期.

鉴于产品今天可以到达第一个流并且在一年内出售,我如何将第二个流加入第一个流的镶木地板数据集中包含的所有历史记录?

结果应该是每个价格范围的平均每日收益......

apache-spark amazon-kinesis spark-streaming pyspark apache-spark-2.0

6
推荐指数
1
解决办法
396
查看次数

Spark Executor 在将数据帧写入镶木地板时性能低下

Spark 版本:2.3 hadoop 发行版:azure Hdinsight 2.6.5 平台:Azure 存储:BLOB

集群中的节点:6 个执行程序实例:每个执行程序 6 个核心:3 每个执行程序内存:8GB

尝试通过同一存储帐户上的 Spark 数据帧将 azure blob (wasb) 中的 csv 文件(大小 4.5g - 280 col,280 万行)加载为 parquet 格式。我已将文件重新分区为不同大小,即 20、40、60、100,但遇到一个奇怪的问题,即处理非常小的记录子集(< 1%)的 6 个执行程序中的 2 个继续运行 1 小时左右并最终完成。

问题 :

1) 这 2 个执行程序正在处理的分区需要处理的记录最少(少于 1%),但需要近一个小时才能完成。这是什么原因。这与数据倾斜场景相反吗?

2) 运行这些执行程序的节点上的本地缓存文件夹已被填满 (50-60GB)。不确定这背后的原因。

3) 增加分区确实会将整体执行时间降低到 40 分钟,但只想知道这 2 个执行器的低执行时间背后的原因。

Spark 新手,因此期待一些调整此工作负载的指导。附加来自 Spark WebUi 的附加信息。

performance apache-spark parquet apache-spark-sql apache-spark-2.0

6
推荐指数
1
解决办法
1379
查看次数

任务仅在 Spark 中的一个执行器上运行

我正在使用 Java 在 Spark 中运行以下代码。

代码

测试.java

package com.sample;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;

import com.addition.AddTwoNumbers;

public class Test{

    private static final String APP_NAME = "Test";
    private static final String LOCAL = "local";
    private static final String MASTER_IP = "spark://10.180.181.26:7077";

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName(APP_NAME).setMaster(MASTER_IP);
        String connection = "jdbc:oracle:thin:test/test@//xyz00aie.in.oracle.com:1521/PDX2600N";
        // Create Spark Context
        SparkContext context = new SparkContext(conf);
        // Create Spark Session

        SparkSession sparkSession = …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-2.0 spark-submit

6
推荐指数
1
解决办法
1万
查看次数

Apache Spark 2.2:当您已经缓存要广播的数据帧时,广播连接不起作用

我有多个大数据帧(大约 30GB)称为 as 和 bs,一个相对较小的数据帧(大约 500MB ~ 1GB)称为 spp。我试图将 spp 缓存到内存中,以避免多次从数据库或文件中读取数据。

但是我发现如果我缓存 spp,物理计划显示它不会使用广播连接,即使 spp 被广播功能包围。但是,如果我取消持久化 spp,计划会显示它使用广播连接。

有熟悉这个的吗?

scala> spp.cache
res38: spp.type = [id: bigint, idPartner: int ... 41 more fields]

scala> val as = acs.join(broadcast(spp), $"idsegment" === $"idAdnetProductSegment")
as: org.apache.spark.sql.DataFrame = [idsegmentpartner: bigint, ssegmentsource: string ... 44 more fields]

scala> as.explain
== Physical Plan ==
*SortMergeJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner
:- *Sort [idsegment#286L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(idsegment#286L, 200)
:     +- *Filter isnotnull(idsegment#286L)
:        +- HiveTableScan [idsegmentpartner#282L, …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

6
推荐指数
1
解决办法
1637
查看次数

由于 int 和 bigint 数据类型不兼容,读取 parquet 文件时合并架构失败

尝试使用架构合并加载镶木地板文件时

   df = spark.read.option("mergeSchema", "true").parquet('some_path/partition_date')
   df.show()
Run Code Online (Sandbox Code Playgroud)

我收到以下异常:

    Py4JJavaError: An error occurred while calling o421.parquet.
: org.apache.spark.SparkException: Failed merging schema:
root
....
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$mergeSchemasInParallel$1.apply(ParquetFileFormat.scala:643)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$mergeSchemasInParallel$1.apply(ParquetFileFormat.scala:639)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:639)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:643)
    at sun.reflect.GeneratedMethodAccessor137.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to merge fields 'some_field_name' and 'some_field_name'. …
Run Code Online (Sandbox Code Playgroud)

python apache-spark parquet pyspark apache-spark-2.0

6
推荐指数
1
解决办法
4267
查看次数