我有一个类似的架构 -
val schema = StructType( Seq (
StructField( "a", StringType, true),
StructField( "b", StringType, true),
StructField( "c", StringType, true)
))
and making a dataframe like -
import scala.collection.JavaConverters._
val listrow: List[Row] = List(Row("E-001", "P-001", None), Row("E-001", "P-001", "Attending"))
val rdd = sqlContext.sparkContext.parallelize(listrow)
val df = sqlContext.createDataFrame(rdd, scm)
Run Code Online (Sandbox Code Playgroud)
现在,当我执行df.first()时,我得到一个错误,其实质是 - java.lang.RuntimeException:编码时出错:java.lang.RuntimeException:scala.None $不是有效的外部类型字符串的模式
请注意,我只需要以这种方式创建df.以上只是一个例子,但生产中的代码有点复杂.我认为它曾经在Spark 1.6中工作但在Spark 2.0.1中开始失败.这与编码器有关吗?请注意,某些数据将始终为None,df需要处理它.有没有办法处理这个或数据不能没有?
这里的任何人都可以看到这个并且可能知道解决方案是什么吗?
感谢期待!
我想使用Spark从约1500个远程Oracle表中提取数据,并且我想要一个多线程应用程序,该应用程序每个线程选择一个表,或者每个线程选择10个表,并启动一个Spark作业以从各自的表中读取数据。
从官方Spark网站https://spark.apache.org/docs/latest/job-scheduling.html来看,很明显它可以工作...
...运行Spark的集群管理器为跨应用程序调度提供了便利。其次,在每个Spark应用程序中,如果多个“作业”(Spark操作)是由不同的线程提交的,则它们可能同时运行。如果您的应用程序通过网络处理请求,则这很常见。Spark包含一个公平的调度程序,用于调度每个SparkContext中的资源。
但是,您可能已经在Spark中的此类SO 并发作业执行中注意到,该相似问题没有被接受的答案,而最受支持的答案始于
这实际上不是Spark的精神
有人以前有这样的东西可以工作吗?你有什么特别的事吗?在我浪费大量工作时间进行原型设计之前,只想提供一些建议。我真的很感谢任何帮助!
我开始玩Spark 2.0.1了.新的数据集API非常干净,但我遇到了非常简单的操作问题.
也许我错过了什么,希望有人可以提供帮助.
这些说明
SparkConf conf = new SparkConf().setAppName("myapp").setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
Dataset<Info> infos = spark.read().json("data.json").as(Encoders.bean(Info.class));
System.out.println(infos.rdd().count());
Run Code Online (Sandbox Code Playgroud)
产生一个
java.lang.NegativeArraySizeException
Run Code Online (Sandbox Code Playgroud)
和JVM(1.8)检测到的致命错误.
使用数据集api处理数据(即,选择,依靠信息对象)可以正常工作.
如何在数据集和RDD之间切换?
我刚刚从 Spark 2.0.2 升级到 Spark 2.1.0(通过下载 Hadoop 2.7 及更高版本的预构建版本)。没有安装 Hive。
spark-shell 启动后,会在启动位置创建metastore_db/
文件夹和derby.log
文件,以及一堆警告日志(在以前的版本中没有打印出来)。
仔细检查调试日志显示 Spark 2.1.0 尝试初始化 a HiveMetastoreConnection
:
17/01/13 09:14:44 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
Spark 2.0.2 的类似调试日志没有显示HiveMetastoreConnection
.
这是预期的行为吗?是否与spark.sql.warehouse.dir
现在会话之间共享的静态配置有关?我如何避免这种情况,因为我没有安装 Hive?
提前致谢!
我在play-scala应用程序中从1.6升级到spark 2.0,我不太确定如何设置我想要的jar文件.以前会定义一个SparkConf,我可以调用的方法之一是setJars,它允许我指定我想要的所有jar文件.现在我使用SparkSession构建器构建我的spark conf和spark上下文,我没有看到任何类似的方法来指定jar文件?我怎样才能做到这一点?
以下是我之前创建sparkconf的方法:
val sparkConf = new SparkConf().setMaster(sparkMaster).setAppName(sparkAppName).
set("spark.yarn.jar", "hdfs:///user/hadoop/spark-assembly-1.6.1-hadoop2.7.2.jar").
set("spark.eventLog.dir", "hdfs:///var/log/spark/apps").
set("spark.eventLog.enabled", "true").
set("spark.executorEnv.JAVA_HOME", "/usr/lib/jvm/jre-1.8.0-openjdk").
setJars(Seq(
"ALL JAR FILES LISTED HERE"
))
Run Code Online (Sandbox Code Playgroud)
使用sparksession构建器完成与"setJars"相同的操作我能做些什么?
我需要编写有效的json,但spark允许一次写入单行,例如:
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}
Run Code Online (Sandbox Code Playgroud)
以上Json无效。相反,我需要这个:
{
{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
}
Run Code Online (Sandbox Code Playgroud)
如何在Java中实现?
json apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
我正在尝试在独立群集上运行简单的Spark代码.以下是代码:
from pyspark import SparkConf,SparkContext
if __name__ == "__main__":
conf = SparkConf().setAppName("even-numbers").setMaster("spark://sumit-Inspiron-N5110:7077")
sc = SparkContext(conf)
inp = sc.parallelize([1,2,3,4,5])
even = inp.filter(lambda x: (x % 2 == 0)).collect()
for i in even:
print(i)
Run Code Online (Sandbox Code Playgroud)
但是,我收到错误,指出"无法解析主URL":
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Could not parse Master URL: '<pyspark.conf.SparkConf object at 0x7fb27e864850>'
at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2760)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:501)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:236)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748) …
Run Code Online (Sandbox Code Playgroud) 我正在使用 DataFrame API 在 Spark 中编写以下代码。
val cond = "col("firstValue") >= 0.5 & col("secondValue") >= 0.5 & col("thirdValue") >= 0.5"
val Output1 = InputDF.where(cond)
Run Code Online (Sandbox Code Playgroud)
我将所有条件作为来自外部参数的字符串传递,但它会抛出一个解析错误,因为它cond
应该是类型Column
。
例如:
col("firstValue") >= 0.5 & col("secondValue") >= 0.5 & col("thirdValue") >= 0.5
Run Code Online (Sandbox Code Playgroud)
由于我想动态传递多个条件,如何将 a 转换String
为 a Column
?
有什么东西可以让我从外部读取条件列表 as Column
,因为我没有找到任何可以使用 Scala 代码将 a 转换String
为 a 的东西Column
。
scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
我有一个 Spark RDD,我想以有组织的方式对其条目进行排序。假设条目是一个包含 3 个元素的元组(name,phonenumber,timestamp)
。我想首先根据 的值对条目进行排序phonenumber
,然后根据 的值进行timestamp
排序,同时尊重而不是更改基于phonenumber
. (所以timestamp
只根据phonenumber
排序重新排列)。是否有 Spark 函数来执行此操作?
(我在 Scala 中使用 Spark 2.x)
我有一些表需要掩盖其某些列。每个表要屏蔽的列各不相同,我正在从application.conf
文件中读取这些列。
例如,对于雇员表,如下所示
+----+------+-----+---------+
| id | name | age | address |
+----+------+-----+---------+
| 1 | abcd | 21 | India |
+----+------+-----+---------+
| 2 | qazx | 42 | Germany |
+----+------+-----+---------+
Run Code Online (Sandbox Code Playgroud)
如果我们要屏蔽名称和年龄列,那么我将按顺序获取这些列。
val mask = Seq("name", "age")
Run Code Online (Sandbox Code Playgroud)
屏蔽后的期望值为:
+----+----------------+----------------+---------+
| id | name | age | address |
+----+----------------+----------------+---------+
| 1 | *** Masked *** | *** Masked *** | India |
+----+----------------+----------------+---------+
| 2 | *** Masked *** | *** Masked *** | Germany | …
Run Code Online (Sandbox Code Playgroud)