标签: apache-spark-2.0

如何在Spark 2中的数据框中允许接受无值?

我有一个类似的架构 -

val schema = StructType( Seq (
StructField( "a", StringType, true),
StructField( "b", StringType, true),
StructField( "c", StringType, true)
))

and making a dataframe like - 

import scala.collection.JavaConverters._

val listrow: List[Row] = List(Row("E-001", "P-001", None), Row("E-001", "P-001", "Attending"))
val rdd = sqlContext.sparkContext.parallelize(listrow)
val df = sqlContext.createDataFrame(rdd, scm)
Run Code Online (Sandbox Code Playgroud)

现在,当我执行df.first()时,我得到一个错误,其实质是 - java.lang.RuntimeException:编码时出错:java.lang.RuntimeException:scala.None $不是有效的外部类型字符串的模式

请注意,我只需要以这种方式创建df.以上只是一个例子,但生产中的代码有点复杂.我认为它曾经在Spark 1.6中工作但在Spark 2.0.1中开始失败.这与编码器有关吗?请注意,某些数据将始终为None,df需要处理它.有没有办法处理这个或数据不能没有?

这里的任何人都可以看到这个并且可能知道解决方案是什么吗?

感谢期待!

scala apache-spark spark-dataframe apache-spark-2.0

2
推荐指数
1
解决办法
1158
查看次数

从多线程驱动程序启动Apache Spark SQL作业

我想使用Spark从约1500个远程Oracle表中提取数据,并且我想要一个多线程应用程序,该应用程序每个线程选择一个表,或者每个线程选择10个表,并启动一个Spark作业以从各自的表中读取数据。

从官方Spark网站https://spark.apache.org/docs/latest/job-scheduling.html来看,很明显它可以工作...

...运行Spark的集群管理器为跨应用程序调度提供了便利。其次,在每个Spark应用程序中,如果多个“作业”(Spark操作)是由不同的线程提交的,则它们可能同时运行。如果您的应用程序通过网络处理请求,则这很常见。Spark包含一个公平的调度程序,用于调度每个SparkContext中的资源。

但是,您可能已经在Spark中的此类SO 并发作业执行中注意到,该相似问题没有被接受的答案,而最受支持的答案始于

这实际上不是Spark的精神

  1. 每个人都知道这不是Spark的“精神”
  2. 谁在乎Spark的精神是什么?这实际上没有任何意义

有人以前有这样的东西可以工作吗?你有什么特别的事吗?在我浪费大量工作时间进行原型设计之前,只想提供一些建议。我真的很感谢任何帮助!

java multithreading scala apache-spark apache-spark-2.0

2
推荐指数
2
解决办法
2307
查看次数

Spark 2.0.1 java.lang.NegativeArraySizeException

我开始玩Spark 2.0.1了.新的数据集API非常干净,但我遇到了非常简单的操作问题.

也许我错过了什么,希望有人可以提供帮助.

这些说明

SparkConf conf = new SparkConf().setAppName("myapp").setMaster("local[*]");
SparkSession spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate();

Dataset<Info> infos = spark.read().json("data.json").as(Encoders.bean(Info.class));

System.out.println(infos.rdd().count());
Run Code Online (Sandbox Code Playgroud)

产生一个

 java.lang.NegativeArraySizeException
Run Code Online (Sandbox Code Playgroud)

和JVM(1.8)检测到的致命错误.

使用数据集api处理数据(即,选择,依靠信息对象)可以正常工作.

如何在数据集和RDD之间切换?

java apache-spark apache-spark-2.0

1
推荐指数
1
解决办法
3007
查看次数

预构建 Spark 2.1.0 在启动 spark-shell 时创建 metastore_db 文件夹和 derby.log

我刚刚从 Spark 2.0.2 升级到 Spark 2.1.0(通过下载 Hadoop 2.7 及更高版本的预构建版本)。没有安装 Hive。

spark-shell 启动后,会在启动位置创建metastore_db/文件夹和derby.log文件,以及一堆警告日志(在以前的版本中没有打印出来)。

仔细检查调试日志显示 Spark 2.1.0 尝试初始化 a HiveMetastoreConnection

17/01/13 09:14:44 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.

Spark 2.0.2 的类似调试日志没有显示HiveMetastoreConnection.

这是预期的行为吗?是否与spark.sql.warehouse.dir现在会话之间共享的静态配置有关?我如何避免这种情况,因为我没有安装 Hive?

提前致谢!

apache-spark apache-spark-2.0

1
推荐指数
2
解决办法
5836
查看次数

Spark 2.0套装罐子

我在play-scala应用程序中从1.6升级到spark 2.0,我不太确定如何设置我想要的jar文件.以前会定义一个SparkConf,我可以调用的方法之一是setJars,它允许我指定我想要的所有jar文件.现在我使用SparkSession构建器构建我的spark conf和spark上下文,我没有看到任何类似的方法来指定jar文件?我怎样才能做到这一点?

以下是我之前创建sparkconf的方法:

val sparkConf = new SparkConf().setMaster(sparkMaster).setAppName(sparkAppName).
  set("spark.yarn.jar", "hdfs:///user/hadoop/spark-assembly-1.6.1-hadoop2.7.2.jar").
  set("spark.eventLog.dir", "hdfs:///var/log/spark/apps").
  set("spark.eventLog.enabled", "true").
  set("spark.executorEnv.JAVA_HOME", "/usr/lib/jvm/jre-1.8.0-openjdk").
  setJars(Seq(
    "ALL JAR FILES LISTED HERE"
  ))
Run Code Online (Sandbox Code Playgroud)

使用sparksession构建器完成与"setJars"相同的操作我能做些什么?

scala jar apache-spark apache-spark-2.0

1
推荐指数
1
解决办法
4793
查看次数

如何在Spark中编写有效的json

我需要编写有效的json,但spark允许一次写入单行,例如:

{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}
Run Code Online (Sandbox Code Playgroud)

以上Json无效。相反,我需要这个:

{
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
}
Run Code Online (Sandbox Code Playgroud)

如何在Java中实现?

json apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

1
推荐指数
1
解决办法
2733
查看次数

在独立群集上运行spark时出错

我正在尝试在独立群集上运行简单的Spark代码.以下是代码:

from pyspark import SparkConf,SparkContext

if __name__ == "__main__":
    conf = SparkConf().setAppName("even-numbers").setMaster("spark://sumit-Inspiron-N5110:7077")
    sc = SparkContext(conf)
    inp = sc.parallelize([1,2,3,4,5])
    even = inp.filter(lambda x: (x % 2 == 0)).collect()
    for i in even:
        print(i)
Run Code Online (Sandbox Code Playgroud)

但是,我收到错误,指出"无法解析主URL":

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Could not parse Master URL: '<pyspark.conf.SparkConf object at 0x7fb27e864850>'
    at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2760)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:501)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:236)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748) …
Run Code Online (Sandbox Code Playgroud)

pyspark apache-spark-2.0

1
推荐指数
1
解决办法
742
查看次数

在 Spark 的 where 子句中将多个条件作为字符串传递

我正在使用 DataFrame API 在 Spark 中编写以下代码。

val cond = "col("firstValue") >= 0.5 & col("secondValue") >= 0.5 & col("thirdValue") >= 0.5"
val Output1 = InputDF.where(cond)
Run Code Online (Sandbox Code Playgroud)

我将所有条件作为来自外部参数的字符串传递,但它会抛出一个解析错误,因为它cond应该是类型Column

例如:

col("firstValue") >= 0.5 & col("secondValue") >= 0.5 & col("thirdValue") >= 0.5
Run Code Online (Sandbox Code Playgroud)

由于我想动态传递多个条件,如何将 a 转换String为 a Column

编辑

有什么东西可以让我从外部读取条件列表 as Column,因为我没有找到任何可以使用 Scala 代码将 a 转换String为 a 的东西Column

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

1
推荐指数
1
解决办法
8795
查看次数

如何同时使用两个功能对RDD条目进行排序?

我有一个 Spark RDD,我想以有组织的方式对其条目进行排序。假设条目是一个包含 3 个元素的元组(name,phonenumber,timestamp)。我想首先根据 的值对条目进行排序phonenumber,然后根据 的值进行timestamp排序,同时尊重而不是更改基于phonenumber. (所以timestamp只根据phonenumber排序重新排列)。是否有 Spark 函数来执行此操作?

(我在 Scala 中使用 Spark 2.x)

scala apache-spark rdd apache-spark-2.0

0
推荐指数
1
解决办法
572
查看次数

如何使用Spark 2屏蔽列?

我有一些表需要掩盖其某些列。每个表要屏蔽的列各不相同,我正在从application.conf文件中读取这些列。

例如,对于雇员表,如下所示

+----+------+-----+---------+
| id | name | age | address |
+----+------+-----+---------+
| 1  | abcd | 21  | India   |
+----+------+-----+---------+
| 2  | qazx | 42  | Germany |
+----+------+-----+---------+
Run Code Online (Sandbox Code Playgroud)

如果我们要屏蔽名称和年龄列,那么我将按顺序获取这些列。

val mask = Seq("name", "age")
Run Code Online (Sandbox Code Playgroud)

屏蔽后的期望值为:

+----+----------------+----------------+---------+
| id | name           | age            | address |
+----+----------------+----------------+---------+
| 1  | *** Masked *** | *** Masked *** | India   |
+----+----------------+----------------+---------+
| 2  | *** Masked *** | *** Masked *** | Germany | …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-2.0

0
推荐指数
1
解决办法
2873
查看次数