我当前正在使用spark处理数据,并且foreach分区打开与mysql的连接,并将其以1000的批数插入到数据库中。如SparkDocumentation所述,默认值为spark.sql.shuffle.partitions
200,但我想保持其动态。因此,我该如何计算。因此,既不选择导致性能降低的非常高的值,也不选择导致性能降低的非常小的值OOM
。
是否有任何配置属性我们可以将其设置为在spark 1.6中通过spark-shell禁用/启用Hive支持.我试图获取所有sqlContext配置属性,
sqlContext.getAllConfs.foreach(println)
Run Code Online (Sandbox Code Playgroud)
但是,我不确定实际上需要哪个属性来禁用/启用配置单元支持.或者还有其他办法吗?
我正在使用spark version 1.6.3
并yarn version 2.7.1.2.3
附带HDP-2.3.0.0-2557
.因为我使用的HDP版本中的火花版本太旧了,我更喜欢远程使用另一个火花作为纱线模式.
这是我如何运行火花壳;
./spark-shell --master yarn-client
Run Code Online (Sandbox Code Playgroud)
一切似乎都很好,sparkContext
初始化,sqlContext
初始化.我甚至可以访问我的蜂巢表.但在某些情况下,当它尝试连接到块管理器时会遇到麻烦.
我不是专家,但我认为,当我在纱线模式下运行它时,块管理器正在我的纱线集群上运行.这对我来说似乎是一个网络问题,并且不想在这里问它.但是,在某些我无法弄清楚的情况下会发生这种情况.所以它让我觉得这可能不是网络问题.
这是代码;
def df = sqlContext.sql("select * from city_table")
Run Code Online (Sandbox Code Playgroud)
以下代码工作正常;
df.limit(10).count()
Run Code Online (Sandbox Code Playgroud)
但是大小超过10,我不知道,每次运行都会发生变化;
df.count()
Run Code Online (Sandbox Code Playgroud)
这引起了例外;
6/12/30 07:31:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 157 bytes
16/12/30 07:31:19 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 8, 172.27.247.204): FetchFailed(BlockManagerId(2, 172.27.247.204, 56093), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.27.247.204:56093
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) …
Run Code Online (Sandbox Code Playgroud) 我有一个 PySpark df:
+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|
+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| 8| 9| 5| b1|
| 1| 2| 43| 8| 10| 20| 43| e1|
| 2| 3| 15| 0| 1| 23| 7| b1|
| 3| 4| 2| 6| 11| 5| 8| d1|
| 4| 5| 6| 7| 2| 8| 1| f1|
+---+---+---+---+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
我最终想创建另一列“out”,其值基于“ref”列。例如,在第一行 ref 列中有 b1 作为值。在“out”列中,我希望看到“b1”列的值,即 23。这是预期的输出:
+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
| 0| 1| 23| 4| …
Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql pyspark pyspark-sql apache-spark-1.6
如何从组中获取第一个非空值?我尝试首先使用coalesce F.first(F.coalesce("code"))
但我没有得到所需的行为(我似乎得到了第一行).
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
sc = SparkContext("local")
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame([
("a", None, None),
("a", "code1", None),
("a", "code2", "name2"),
], ["id", "code", "name"])
Run Code Online (Sandbox Code Playgroud)
我试过了:
(df
.groupby("id")
.agg(F.first(F.coalesce("code")),
F.first(F.coalesce("name")))
.collect())
Run Code Online (Sandbox Code Playgroud)
期望的输出
[Row(id='a', code='code1', name='name2')]
Run Code Online (Sandbox Code Playgroud) 从Spark 下载页面,如果我下载v2.0.1的tar 文件,我会看到它包含一些我认为包含在我的应用程序中有用的 jar。
如果我下载v1.6.2的tar 文件,我在那里找不到 jars 文件夹。是否有我应该从该站点使用的替代包装类型?我目前正在选择默认值(为 Hadoop 2.6 预构建)。或者,我在哪里可以找到那些 Spark 罐子 - 我应该从http://spark-packages.org单独获取每个罐子吗?
这是我想使用的一组指示性罐子:
我有一个 CSV 文件,我正在尝试使用Spark CSV 包加载它,但它没有正确加载数据,因为其中很少有字段\n
,例如以下两行
"XYZ", "Test Data", "TestNew\nline", "OtherData"
"XYZ", "Test Data", "blablablabla
\nblablablablablalbal", "OtherData"
Run Code Online (Sandbox Code Playgroud)
我使用下面的代码这是我使用直截了当parserLib
的univocity
在网上阅读它解决了多个问题,换行,但它似乎并不适合我的情况。
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.option("parserLib","univocity")
.load("data.csv");
Run Code Online (Sandbox Code Playgroud)
如何在以引号开头的字段中替换换行符。有没有更简单的方法?
scala apache-spark apache-spark-sql spark-csv apache-spark-1.6
我使用的是 IntelliJ 2016.3 版本。
import sbt.Keys._
import sbt._
object ApplicationBuild extends Build {
object Versions {
val spark = "1.6.3"
}
val projectName = "example-spark"
val common = Seq(
version := "1.0",
scalaVersion := "2.11.7"
)
val customLibraryDependencies = Seq(
"org.apache.spark" %% "spark-core" % Versions.spark % "provided",
"org.apache.spark" %% "spark-sql" % Versions.spark % "provided",
"org.apache.spark" %% "spark-hive" % Versions.spark % "provided",
"org.apache.spark" %% "spark-streaming" % Versions.spark % "provided",
"org.apache.spark" %% "spark-streaming-kafka" % Versions.spark
exclude("log4j", "log4j")
exclude("org.spark-project.spark", "unused"),
"com.typesafe.scala-logging" %% "scala-logging" …
Run Code Online (Sandbox Code Playgroud) noclassdeffounderror apache-spark apache-spark-sql apache-spark-1.6
我有一个数据帧
|--id:string (nullable = true)
|--ddd:struct (nullable = true)
|-- aaa: string (nullable = true)
|-- bbb: long(nullable = true)
|-- ccc: string (nullable = true)
|-- eee: long(nullable = true)
Run Code Online (Sandbox Code Playgroud)
我有这样的输出
id | ddd
--------------------------
1 | [hi,1,this,2]
2 | [hello,6,good,3]
1 | [hru,2,where,7]
3 | [in,4,you,1]
2 | [how,4,to,3]
Run Code Online (Sandbox Code Playgroud)
我希望预期的o/p为:
id | ddd
--------------------
1 | [hi,1,this,2],[hru,2,where,7]
2 | [hello,6,good,3],[how,4,to,3]
3 | [in,4,you,1]
Run Code Online (Sandbox Code Playgroud)
请帮忙
我在PySpark中有一个包含空格,Null和Nan的数据帧.我想删除任何有这些行的行.我尝试了以下命令,但似乎没有任何工作.
myDF.na.drop().show()
myDF.na.drop(how='any').show()
Run Code Online (Sandbox Code Playgroud)
以下是数据帧:
+---+----------+----------+-----+-----+
|age| category| date|empId| name|
+---+----------+----------+-----+-----+
| 25|electronic|17-01-2018| 101| abc|
| 24| sports|16-01-2018| 102| def|
| 23|electronic|17-01-2018| 103| hhh|
| 23|electronic|16-01-2018| 104| yyy|
| 29| men|12-01-2018| 105| ajay|
| 31| kids|17-01-2018| 106|vijay|
| | Men| nan| 107|Sumit|
+---+----------+----------+-----+-----+
Run Code Online (Sandbox Code Playgroud)
我错过了什么?解决NULL,Nan或空格的最佳方法是什么,以便在实际计算中没有问题?
我想根据数据框中现有的列子集创建一个新列(v5)。
示例数据框:
+---+---+---+---+
| v1| v2| v3| v4|
+---+---+---+---+
| 2| 4|7.0|4.0|
| 99| 0|2.0|0.0|
|189| 0|2.4|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
提供示例数据框的另一个视图:
+---+---+---+---+
| v1| v3| v2| v4|
+---+---+---+---+
| 2|7.0| 4|4.0|
| 99|2.0| 0|0.0|
|189|2.4| 0|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
它的创建者:
+---+---+---+---+
| v1| v2| v3| v4|
+---+---+---+---+
| 2| 4|7.0|4.0|
| 99| 0|2.0|0.0|
|189| 0|2.4|0.0|
+---+---+---+---+
Run Code Online (Sandbox Code Playgroud)
最终,我想做的是创建另一个列 v5,它是与 v1 和 v2 的最小值相对应的值,忽略任一列中存在的零和空值。假设 v1 为键,v3 为值对。同样,v2 为键,v4 为值。例如,在第一行中:在 v1 和 v2 中,最小值属于 v1,即 2,因此 v5 列中的输出应为 7.0 同样,在第二行中:忽略 v1 和 v2 的零值和空值,输出应为成为2.0
原始数据帧有五列作为键,相应的五列作为值所需的输出:
+---+---+---+---+---+
| …
Run Code Online (Sandbox Code Playgroud) apache-spark ×11
apache-spark-1.6 ×11
pyspark ×4
scala ×2
hadoop-yarn ×1
hive ×1
jar ×1
pyspark-sql ×1
spark-csv ×1