标签: pyspark

Scala与Python的Spark性能

我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.

有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = { …
Run Code Online (Sandbox Code Playgroud)

performance scala apache-spark rdd pyspark

166
推荐指数
1
解决办法
4万
查看次数

如何在pyspark中更改数据框列名?

我来自pandas背景,习惯于将CSV文件中的数据读入数据帧,然后使用简单命令将列名更改为有用的东西:

df.columns = new_column_name_list
Run Code Online (Sandbox Code Playgroud)

但是,在使用sqlContext创建的pyspark数据帧中,这同样不起作用.我可以轻松解决的唯一解决方案如下:

df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
  k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)
Run Code Online (Sandbox Code Playgroud)

这基本上是定义变量两次并首先推断模式然后重命名列名,然后再次使用更新的模式加载数据帧.

像熊猫一样,有更好更有效的方法吗?

我的火花版是1.5.0

python apache-spark pyspark pyspark-sql

159
推荐指数
12
解决办法
22万
查看次数

如何在Spark中关闭INFO日志记录?

我使用AWS EC2指南安装了Spark,我可以使用bin/pyspark脚本启动程序以获得spark提示,也可以成功执行Quick Start quide.

但是,我不能为我的生活弄清楚如何INFO在每个命令后停止所有详细的日志记录.

我已经在我的log4j.properties文件中的几乎所有可能的场景中尝试了我的conf文件,在我从中启动应用程序的文件夹以及每个节点上,没有做任何事情.INFO执行每个语句后,我仍然会打印日志语句.

我对这应该如何工作非常困惑.

#Set everything to be logged to the console log4j.rootCategory=INFO, console                                                                        
log4j.appender.console=org.apache.log4j.ConsoleAppender 
log4j.appender.console.target=System.err     
log4j.appender.console.layout=org.apache.log4j.PatternLayout 
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)

这是我使用时的完整类路径SPARK_PRINT_LAUNCH_COMMAND:

Spark命令:/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1 -bin-hadoop2/CONF:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib /datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2 /lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize = 128m -Djava.library.path = -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark. repl.Main

内容spark-env.sh:

#!/usr/bin/env bash

# This file is sourced when …
Run Code Online (Sandbox Code Playgroud)

python hadoop scala apache-spark pyspark

128
推荐指数
12
解决办法
9万
查看次数

如何在Spark DataFrame中添加常量列?

我想在a中添加一个DataFrame具有任意值的列(对于每一行都是相同的).我使用时出现错误withColumn如下:

dt.withColumn('new_column', 10).head(5)
Run Code Online (Sandbox Code Playgroud)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'
Run Code Online (Sandbox Code Playgroud)

似乎我可以通过添加和减去其中一个列(因此它们添加到零)然后添加我想要的数字(在这种情况下为10)来欺骗函数按照我想要的方式工作:

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
Run Code Online (Sandbox Code Playgroud)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

116
推荐指数
2
解决办法
12万
查看次数

如何向Spark DataFrame添加新列(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.

我试过以下但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)

使用这个也有错误:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)

那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?

python dataframe apache-spark apache-spark-sql pyspark

110
推荐指数
4
解决办法
21万
查看次数

在python shell中导入pyspark

这是另一个从未回答过的论坛上的别人问题的副本,所以我想我会在这里重新提问,因为我有同样的问题.(见http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736)

我在我的机器上正确安装了Spark,并且当使用./bin/pyspark作为我的python解释器时,能够使用pyspark模块运行python程序而不会出错.

但是,当我尝试运行常规Python shell时,当我尝试导入pyspark模块时,我收到此错误:

from pyspark import SparkContext
Run Code Online (Sandbox Code Playgroud)

它说

"No module named pyspark".
Run Code Online (Sandbox Code Playgroud)

我怎样才能解决这个问题?是否需要设置环境变量以将Python指向pyspark headers/libraries/etc. 如果我的火花安装是/ spark /,我需要包含哪些pyspark路径?或者pyspark程序只能从pyspark解释器运行?

python apache-spark pyspark

102
推荐指数
11
解决办法
14万
查看次数

使用Spark加载CSV文件

我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据.这就是我在做的事情:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()
Run Code Online (Sandbox Code Playgroud)

我希望这个调用能给我一个我文件的两个第一列的列表,但是我收到了这个错误:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
Run Code Online (Sandbox Code Playgroud)

虽然我的CSV文件不止一列.

python csv apache-spark pyspark

95
推荐指数
10
解决办法
24万
查看次数

Spark Kill运行应用程序

我有一个正在运行的Spark应用程序,它占用了我的其他应用程序不会分配任何资源的所有核心.

我做了一些快速的研究,人们建议使用YARN kill或/ bin/spark-class来杀死命令.但是,我使用CDH版本和/ bin/spark-class甚至根本不存在,YARN kill应用程序也不起作用.

在此输入图像描述

任何人都可以和我一起吗?

hadoop-yarn apache-spark pyspark

86
推荐指数
3
解决办法
12万
查看次数

使用无值过滤Pyspark数据框列

我正在尝试过滤具有None行值的PySpark数据帧:

df.select('dt_mvmt').distinct().collect()

[Row(dt_mvmt=u'2016-03-27'),
 Row(dt_mvmt=u'2016-03-28'),
 Row(dt_mvmt=u'2016-03-29'),
 Row(dt_mvmt=None),
 Row(dt_mvmt=u'2016-03-30'),
 Row(dt_mvmt=u'2016-03-31')]
Run Code Online (Sandbox Code Playgroud)

我可以使用字符串值正确过滤:

df[df.dt_mvmt == '2016-03-31']
# some results here
Run Code Online (Sandbox Code Playgroud)

但这失败了:

df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0
Run Code Online (Sandbox Code Playgroud)

但每个类别肯定都有价值观.这是怎么回事?

python dataframe apache-spark apache-spark-sql pyspark

76
推荐指数
5
解决办法
16万
查看次数

将spark DataFrame列转换为python列表

我处理一个包含两列mvv和count的数据帧.

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |
Run Code Online (Sandbox Code Playgroud)

我想获得两个包含mvv值和计数值的列表.就像是

mvv = [1,2,3,4]
count = [5,9,3,1]
Run Code Online (Sandbox Code Playgroud)

所以,我尝试了以下代码:第一行应该返回一个python列表行.我想看到第一个值:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
Run Code Online (Sandbox Code Playgroud)

但是我收到第二行的错误消息:

AttributeError:getInt

python apache-spark pyspark spark-dataframe

76
推荐指数
11
解决办法
12万
查看次数