我使用Apache Spark 1.4设置了IntelliJ IDEA.
我希望能够将调试点添加到我的Spark Python脚本中,以便我可以轻松地调试它们.
我目前正在运行这一点Python来初始化spark过程
proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE)
if VERBOSE:
print proc.stdout.read()
print proc.stderr.read()
Run Code Online (Sandbox Code Playgroud)
当spark-submit
最终调用myFirstSparkScript.py
,调试模式不从事并正常执行.遗憾的是,编辑Apache Spark源代码并运行自定义副本是不可接受的解决方案.
有谁知道是否有可能在调试模式下使用spark-submit调用Apache Spark脚本?如果是这样,怎么样?
我去了about.gitlab.com。从那里我点击登录。浏览器不断显示:“在访问 gitlab.com 之前检查您的浏览器。”。这种情况已经持续了10个小时。我已经尝试清除我的cookie并重新启动我的电脑。这没有产生任何结果。我的防火墙已关闭,所以这不是防火墙问题。
我的问题是由计算spark数据帧中连续行之间差异的用例触发的.
例如,我有:
>>> df.show()
+-----+----------+
|index| col1|
+-----+----------+
| 0.0|0.58734024|
| 1.0|0.67304325|
| 2.0|0.85154736|
| 3.0| 0.5449719|
+-----+----------+
Run Code Online (Sandbox Code Playgroud)
如果我选择使用"Window"函数计算它们,那么我可以这样做:
>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index| col1| diffs_col1|
+-----+----------+-----------+
| 0.0|0.58734024|0.085703015|
| 1.0|0.67304325| 0.17850411|
| 2.0|0.85154736|-0.30657548|
| 3.0| 0.5449719| null|
+-----+----------+-----------+
Run Code Online (Sandbox Code Playgroud)
问题:我在一个分区中明确地划分了数据帧.这会对性能产生什么影响,如果存在,为什么会这样,我怎么能避免它呢?因为当我没有指定分区时,我收到以下警告:
16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Run Code Online (Sandbox Code Playgroud) partitioning window-functions apache-spark apache-spark-sql pyspark
我有格式化为以下示例的大数据记录:
// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc| 123| true|
// |abc| 345| true|
// |abc| 567| true|
// |def| 123| true|
// |def| 345| true|
// |def| 567| true|
// |def| 789| false|
// +---+------+------+
Run Code Online (Sandbox Code Playgroud)
cid
并且itemId
是字符串。
有965,964,223条记录。
我正在尝试cid
使用StringIndexer
以下方法将其转换为整数:
dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)
Run Code Online (Sandbox Code Playgroud)
但是这些代码行非常慢(大约需要30分钟)。问题在于它是如此之大,以至于我之后再也无能为力了。
我正在使用具有2个节点(61 GB内存)的R4 2XLarge集群的Amazon EMR集群。
我可以进一步改善性能吗?任何帮助都感激不尽。
我有两个数据帧A和B.A很大(100 G),B相对较小(100 M).A的分区号是8,B的分区号是1.
A.join(broadcast(B), $"cur" >= $"low" && $"cur" <= $"high", "left_outer")
Run Code Online (Sandbox Code Playgroud)
速度很慢(> 10小时).
但是,如果我将连接条件更改为:
A.join(broadcast(B), $"cur" === $"low" , "left_outer")
Run Code Online (Sandbox Code Playgroud)
它变得非常快(<30分钟).但条件不能改变.
那么有什么方法可以进一步提高我原来的连接条件下的连接速度?
from pyspark.ml.regression import RandomForestRegressionModel
rf = RandomForestRegressor(labelCol="label",featuresCol="features", numTrees=5, maxDepth=10, seed=42)
rf_model = rf.fit(train_df)
rf_model_path = "./hdfsData/" + "rfr_model"
rf_model.save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)
当我第一次尝试保存模型时,这些线条起作用了.但是当我想再次将模型保存到路径中时,它会出现以下错误:
Py4JJavaError:调用o1695.save时发生错误.:java.io.IOException:Path ./hdfsData/rfr_model已存在.请使用write.overwrite().save(path)来覆盖它.
然后我尝试了:
rf_model.write.overwrite().save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)
它给了:
AttributeError:'function'对象没有属性'overwrite'
看来该pyspark.mllib
模块提供了覆盖功能,但没有提供pyspark.ml
模块.如果我想用新模型覆盖旧模型,任何人都知道如何解决这个问题?谢谢.
machine-learning apache-spark pyspark apache-spark-ml apache-spark-mllib
请考虑以下示例
dtrain <- data_frame(text = c("Chinese Beijing Chinese",
"Chinese Chinese Shanghai",
"Chinese Macao",
"Tokyo Japan Chinese"),
doc_id = 1:4,
class = c(1, 1, 1, 0))
dtrain_spark <- copy_to(sc, dtrain, overwrite = TRUE)
> dtrain_spark
# Source: table<dtrain> [?? x 3]
# Database: spark_connection
text doc_id class
<chr> <int> <dbl>
1 Chinese Beijing Chinese 1 1
2 Chinese Chinese Shanghai 2 1
3 Chinese Macao 3 1
4 Tokyo Japan Chinese 4 0
Run Code Online (Sandbox Code Playgroud)
在这里,我有经典的Naive Bayes示例,class
用于识别属于该China
类别的文档.
我可以sparklyr …
我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且该partitionBy
算法在我尝试过的两种方法中都遇到了麻烦。
分区严重偏斜-有些分区很大,有些很小。
问题1:
当我之前使用repartition时repartitionBy
,Spark将所有分区写为单个文件,即使是大文件也是如此
val df = spark.read.parquet("some_data_lake")
df
.repartition('some_col).write.partitionBy("some_col")
.parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)
这需要永远执行,因为Spark不会并行编写大型分区。如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入。
问题2:
当我不使用时repartition
,Spark会写出太多文件。
此代码将写出疯狂的文件。
df.write.partitionBy("some_col").parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)
我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000+个文件!
当我尝试在生产数据集上运行此文件时,一个包含1.3 GB数据的分区被写为3,100个文件。
我想要什么
我希望每个分区都写成1 GB文件。因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出。
我最好的前进道路是什么?
如果我们在 github 存储库帐户中维护我们的代码/脚本,有没有办法从 Github 存储库复制这些脚本并在其他集群(可以是 Hadoop 或 Spark)上执行。
气流是否提供任何操作员来连接到 Github 以获取此类文件?
在 Github 中维护脚本将提供更大的灵活性,因为代码中的每一个更改都将直接从那里反映和使用。
关于这种情况的任何想法都会真正有帮助。
这个问题与此类似,但是没有答案。
我正在尝试在YARN模式下的Spark中启用动态分配。我有11个节点群集,其中包含1个主节点和10个工作节点。我在下面的链接中获取指示:
要在YARN中进行设置:http : //spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
需要在spark-defaults.conf中设置配置变量:https : //spark.apache.org/docs/latest/configuration.html#dynamic-allocation https://spark.apache.org/docs/latest/configuration。 html#shuffle-行为
我还从下面的链接和其他一些资源中获取了参考:https : //jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-dynamic-allocation.html#spark.dynamicAllocation.testing
这是我正在执行的步骤:
在spark-defaults.conf中设置配置变量。我与动态分配和随机播放服务有关的spark-defaults.conf如下:
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
Run Code Online (Sandbox Code Playgroud)在yarn-site.xml中进行更改
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
Run Code Online (Sandbox Code Playgroud)
所有这些步骤都在所有工作程序节点中复制,即spark-defaults.conf具有上述值,而yarn-site.xml具有这些属性。我已确保所有工作节点中都存在/home/hadoop/spark/common/network-yarn/target/scala-2.11/spark-2.2.2-SNAPSHOT-yarn-shuffle.jar。
然后,我在工作节点和主节点中运行$ SPARK_HOME / sbin / start-shuffle-service.sh。在主节点中,我先使用stop-yarn.sh然后再使用start-yarn.sh重新启动YARN。
然后我正在做YARN node -list -all来查看工作程序节点,但是我看不到任何节点
当我禁用财产时
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.auxservices.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>yarn.nodemanager.recovery.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.application.classpath</name>
<value> $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/common/*,$HADOOP_MAPRED_HOME/share/hadoop/common/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/hdfs/*,$HADOOP_MAPRED_HOME/share/hadoop/hdfs/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/yarn/*,$HADOOP_MAPRED_HOME/share/hadoop/yarn/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/tools/*,$HADOOP_MAPRED_HOME/share/hadoop/tools/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/client/*,$HADOOP_MAPRED_HOME/share/hadoop/client/lib/*,/home/hadoop/spark/common/network-yarn/target/scala-2.11/spark-2.2.2-SNAPSHOT-yarn-shuffle.jar </value>
</property>
Run Code Online (Sandbox Code Playgroud)
我可以正常看到所有工作节点,因此似乎洗牌服务未正确配置。
apache-spark ×8
partitioning ×2
pyspark ×2
python ×2
airflow ×1
cloudflare ×1
github ×1
gitlab ×1
hadoop ×1
hadoop-yarn ×1
python-2.7 ×1
r ×1
sparklyr ×1