小编104*_*ica的帖子

如何在调试模式下调用pyspark?

我使用Apache Spark 1.4设置了IntelliJ IDEA.

我希望能够将调试点添加到我的Spark Python脚本中,以便我可以轻松地调试它们.

我目前正在运行这一点Python来初始化spark过程

proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE)

if VERBOSE:
    print proc.stdout.read()
    print proc.stderr.read()
Run Code Online (Sandbox Code Playgroud)

spark-submit最终调用myFirstSparkScript.py,调试模式不从事并正常执行.遗憾的是,编辑Apache Spark源代码并运行自定义副本是不可接受的解决方案.

有谁知道是否有可能在调试模式下使用spark-submit调用Apache Spark脚本?如果是这样,怎么样?

python hadoop intellij-idea python-2.7 apache-spark

19
推荐指数
1
解决办法
1万
查看次数

为什么我无法从浏览器访问 Gitlab.com 的登录页面?

我去了about.gitlab.com。从那里我点击登录。浏览器不断显示:“在访问 gitlab.com 之前检查您的浏览器。”。这种情况已经持续了10个小时。我已经尝试清除我的cookie并重新启动我的电脑。这没有产生任何结果。我的防火墙已关闭,所以这不是防火墙问题。

cloudflare gitlab

16
推荐指数
2
解决办法
7747
查看次数

避免Spark窗口函数中单个分区模式的性能影响

我的问题是由计算spark数据帧中连续行之间差异的用例触发的.

例如,我有:

>>> df.show()
+-----+----------+
|index|      col1|
+-----+----------+
|  0.0|0.58734024|
|  1.0|0.67304325|
|  2.0|0.85154736|
|  3.0| 0.5449719|
+-----+----------+
Run Code Online (Sandbox Code Playgroud)

如果我选择使用"Window"函数计算它们,那么我可以这样做:

>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index|      col1| diffs_col1|
+-----+----------+-----------+
|  0.0|0.58734024|0.085703015|
|  1.0|0.67304325| 0.17850411|
|  2.0|0.85154736|-0.30657548|
|  3.0| 0.5449719|       null|
+-----+----------+-----------+
Run Code Online (Sandbox Code Playgroud)

问题:我在一个分区中明确地划分了数据帧.这会对性能产生什么影响,如果存在,为什么会这样,我怎么能避免它呢?因为当我没有指定分区时,我收到以下警告:

16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Run Code Online (Sandbox Code Playgroud)

partitioning window-functions apache-spark apache-spark-sql pyspark

15
推荐指数
1
解决办法
9714
查看次数

在大型记录上,Spark StringIndexer.fit非常慢

我有格式化为以下示例的大数据记录:

// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc|   123|  true|
// |abc|   345|  true|
// |abc|   567|  true|
// |def|   123|  true|
// |def|   345|  true|
// |def|   567|  true|
// |def|   789| false|
// +---+------+------+
Run Code Online (Sandbox Code Playgroud)

cid并且itemId是字符串。

有965,964,223条记录。

我正在尝试cid使用StringIndexer以下方法将其转换为整数:

dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)
Run Code Online (Sandbox Code Playgroud)

但是这些代码行非常慢(大约需要30分钟)。问题在于它是如此之大,以至于我之后再也无能为力了。

我正在使用具有2个节点(61 GB内存)的R4 2XLarge集群的Amazon EMR集群。

我可以进一步改善性能吗?任何帮助都感激不尽。

apache-spark apache-spark-ml apache-spark-dataset

9
推荐指数
1
解决办法
412
查看次数

如何提高广播加入速度与Spark之间的条件

我有两个数据帧A和B.A很大(100 G),B相对较小(100 M).A的分区号是8,B的分区号是1.

A.join(broadcast(B), $"cur" >= $"low" &&  $"cur" <= $"high", "left_outer")
Run Code Online (Sandbox Code Playgroud)

速度很慢(> 10小时).

但是,如果我将连接条件更改为:

A.join(broadcast(B), $"cur" === $"low" , "left_outer")
Run Code Online (Sandbox Code Playgroud)

它变得非常快(<30分钟).但条件不能改变.

那么有什么方法可以进一步提高我原来的连接条件下的连接速度?

apache-spark apache-spark-sql

7
推荐指数
1
解决办法
1699
查看次数

如何在PySpark中覆盖Spark ML模型?

from pyspark.ml.regression import RandomForestRegressionModel

rf = RandomForestRegressor(labelCol="label",featuresCol="features", numTrees=5, maxDepth=10, seed=42)
rf_model = rf.fit(train_df)
rf_model_path = "./hdfsData/" + "rfr_model"
rf_model.save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)

当我第一次尝试保存模型时,这些线条起作用了.但是当我想再次将模型保存到路径中时,它会出现以下错误:

Py4JJavaError:调用o1695.save时发生错误.:java.io.IOException:Path ./hdfsData/rfr_model已存在.请使用write.overwrite().save(path)来覆盖它.

然后我尝试了:

rf_model.write.overwrite().save(rf_model_path)
Run Code Online (Sandbox Code Playgroud)

它给了:

AttributeError:'function'对象没有属性'overwrite'

看来该pyspark.mllib模块提供了覆盖功能,但没有提供pyspark.ml模块.如果我想用新模型覆盖旧模型,任何人都知道如何解决这个问题?谢谢.

machine-learning apache-spark pyspark apache-spark-ml apache-spark-mllib

6
推荐指数
1
解决办法
2897
查看次数

如何在sparklyr中训练ML模型并预测另一个数据帧的新值?

请考虑以下示例

dtrain <- data_frame(text = c("Chinese Beijing Chinese",
                              "Chinese Chinese Shanghai",
                              "Chinese Macao",
                              "Tokyo Japan Chinese"),
                     doc_id = 1:4,
                     class = c(1, 1, 1, 0))

dtrain_spark <- copy_to(sc, dtrain, overwrite = TRUE)

> dtrain_spark
# Source:   table<dtrain> [?? x 3]
# Database: spark_connection
  text                     doc_id class
  <chr>                     <int> <dbl>
1 Chinese Beijing Chinese       1     1
2 Chinese Chinese Shanghai      2     1
3 Chinese Macao                 3     1
4 Tokyo Japan Chinese           4     0
Run Code Online (Sandbox Code Playgroud)

在这里,我有经典的Naive Bayes示例,class用于识别属于该China类别的文档.

我可以sparklyr …

r apache-spark apache-spark-ml sparklyr

6
推荐指数
1
解决办法
827
查看次数

使用Spark的partitioningBy方法对S3中的大型偏斜数据集进行分区

我正在尝试使用Spark将较大的分区数据集写到磁盘上,并且该partitionBy算法在我尝试过的两种方法中都遇到了麻烦。

分区严重偏斜-有些分区很大,有些很小。

问题1

当我之前使用repartition时repartitionBy,Spark将所有分区写为单个文件,即使是大文件也是如此

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

这需要永远执行,因为Spark不会并行编写大型分区。如果其中一个分区具有1TB的数据,Spark将尝试将整个1TB的数据作为单个文件写入。

问题2

当我不使用时repartition,Spark会写出太多文件。

此代码将写出疯狂的文件。

df.write.partitionBy("some_col").parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)

我在一个很小的8 GB数据子集上运行了此操作,Spark写入了85,000+个文件!

当我尝试在生产数据集上运行此文件时,一个包含1.3 GB数据的分区被写为3,100个文件。

我想要什么

我希望每个分区都写成1 GB文件。因此,具有7 GB数据的分区将作为7个文件被写出,而具有0.3 GB数据的分区将作为单个文件被写出。

我最好的前进道路是什么?

partitioning apache-spark apache-spark-sql

6
推荐指数
3
解决办法
631
查看次数

如何将 Airflow 与 Github 集成以运行脚本

如果我们在 github 存储库帐户中维护我们的代码/脚本,有没有办法从 Github 存储库复制这些脚本并在其他集群(可以是 Hadoop 或 Spark)上执行。

气流是否提供任何操作员来连接到 Github 以获取此类文件?

在 Github 中维护脚本将提供更大的灵活性,因为代码中的每一个更改都将直接从那里反映和使用。

关于这种情况的任何想法都会真正有帮助。

python github airflow

6
推荐指数
1
解决办法
2881
查看次数

在YARN模式下启用Spark上的动态分配

这个问题与类似,但是没有答案。

我正在尝试在YARN模式下的Spark中启用动态分配。我有11个节点群集,其中包含1个主节点和10个工作节点。我在下面的链接中获取指示:

要在YARN中进行设置:http : //spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service

需要在spark-defaults.conf中设置配置变量:https : //spark.apache.org/docs/latest/configuration.html#dynamic-allocation https://spark.apache.org/docs/latest/configuration。 html#shuffle-行为

我还从下面的链接和其他一些资源中获取了参考:https : //jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-dynamic-allocation.html#spark.dynamicAllocation.testing

这是我正在执行的步骤:

  1. 在spark-defaults.conf中设置配置变量。我与动态分配和随机播放服务有关的spark-defaults.conf如下:

    spark.dynamicAllocation.enabled=true
    spark.shuffle.service.enabled=true
    spark.shuffle.service.port=7337
    
    Run Code Online (Sandbox Code Playgroud)
  2. 在yarn-site.xml中进行更改

    spark.dynamicAllocation.enabled=true
    spark.shuffle.service.enabled=true
    spark.shuffle.service.port=7337
    
    Run Code Online (Sandbox Code Playgroud)

    所有这些步骤都在所有工作程序节点中复制,即spark-defaults.conf具有上述值,而yarn-site.xml具有这些属性。我已确保所有工作节点中都存在/home/hadoop/spark/common/network-yarn/target/scala-2.11/spark-2.2.2-SNAPSHOT-yarn-shuffle.jar。

  3. 然后,我在工作节点和主节点中运行$ SPARK_HOME / sbin / start-shuffle-service.sh。在主节点中,我先使用stop-yarn.sh然后再使用start-yarn.sh重新启动YARN。

  4. 然后我正在做YARN node -list -all来查看工作程序节点,但是我看不到任何节点

  5. 当我禁用财产时

    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>spark_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.auxservices.spark_shuffle.class</name>
        <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
        <name>yarn.nodemanager.recovery.enabled</name>
        <value>true</value>
    </property>
    <property>
        <name>yarn.application.classpath</name>
        <value> $HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/common/*,$HADOOP_MAPRED_HOME/share/hadoop/common/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/hdfs/*,$HADOOP_MAPRED_HOME/share/hadoop/hdfs/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/yarn/*,$HADOOP_MAPRED_HOME/share/hadoop/yarn/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/tools/*,$HADOOP_MAPRED_HOME/share/hadoop/tools/lib/*,$HADOOP_MAPRED_HOME/share/hadoop/client/*,$HADOOP_MAPRED_HOME/share/hadoop/client/lib/*,/home/hadoop/spark/common/network-yarn/target/scala-2.11/spark-2.2.2-SNAPSHOT-yarn-shuffle.jar </value>
    </property>
    
    Run Code Online (Sandbox Code Playgroud)

    我可以正常看到所有工作节点,因此似乎洗牌服务未正确配置。

dynamic-memory-allocation hadoop-yarn apache-spark

6
推荐指数
0
解决办法
291
查看次数