我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.
有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6
,每个核心都有一些东西需要处理.
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = { …
Run Code Online (Sandbox Code Playgroud) 我来自pandas背景,习惯于将CSV文件中的数据读入数据帧,然后使用简单命令将列名更改为有用的东西:
df.columns = new_column_name_list
Run Code Online (Sandbox Code Playgroud)
但是,在使用sqlContext创建的pyspark数据帧中,这同样不起作用.我可以轻松解决的唯一解决方案如下:
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)
Run Code Online (Sandbox Code Playgroud)
这基本上是定义变量两次并首先推断模式然后重命名列名,然后再次使用更新的模式加载数据帧.
像熊猫一样,有更好更有效的方法吗?
我的火花版是1.5.0
我使用AWS EC2指南安装了Spark,我可以使用bin/pyspark
脚本启动程序以获得spark提示,也可以成功执行Quick Start quide.
但是,我不能为我的生活弄清楚如何INFO
在每个命令后停止所有详细的日志记录.
我已经在我的log4j.properties
文件中的几乎所有可能的场景中尝试了我的conf
文件,在我从中启动应用程序的文件夹以及每个节点上,没有做任何事情.INFO
执行每个语句后,我仍然会打印日志语句.
我对这应该如何工作非常困惑.
#Set everything to be logged to the console log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)
这是我使用时的完整类路径SPARK_PRINT_LAUNCH_COMMAND
:
Spark命令:/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1 -bin-hadoop2/CONF:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib /datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2 /lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize = 128m -Djava.library.path = -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark. repl.Main
内容spark-env.sh
:
#!/usr/bin/env bash
# This file is sourced when …
Run Code Online (Sandbox Code Playgroud) 我想在a中添加一个DataFrame
具有任意值的列(对于每一行都是相同的).我使用时出现错误withColumn
如下:
dt.withColumn('new_column', 10).head(5)
Run Code Online (Sandbox Code Playgroud)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
1 dt = (messages
2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)
/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
1167 """
-> 1168 return self.select('*', col.alias(colName))
1169
1170 @ignore_unicode_prefix
AttributeError: 'int' object has no attribute 'alias'
Run Code Online (Sandbox Code Playgroud)
似乎我可以通过添加和减去其中一个列(因此它们添加到零)然后添加我想要的数字(在这种情况下为10)来欺骗函数按照我想要的方式工作:
dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
Run Code Online (Sandbox Code Playgroud)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=93506471, messagetype=1, dt=4809600.0, …
Run Code Online (Sandbox Code Playgroud) 我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.
我试过以下但没有成功:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)
使用这个也有错误:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)
那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?
这是另一个从未回答过的论坛上的别人问题的副本,所以我想我会在这里重新提问,因为我有同样的问题.(见http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736)
我在我的机器上正确安装了Spark,并且当使用./bin/pyspark作为我的python解释器时,能够使用pyspark模块运行python程序而不会出错.
但是,当我尝试运行常规Python shell时,当我尝试导入pyspark模块时,我收到此错误:
from pyspark import SparkContext
Run Code Online (Sandbox Code Playgroud)
它说
"No module named pyspark".
Run Code Online (Sandbox Code Playgroud)
我怎样才能解决这个问题?是否需要设置环境变量以将Python指向pyspark headers/libraries/etc. 如果我的火花安装是/ spark /,我需要包含哪些pyspark路径?或者pyspark程序只能从pyspark解释器运行?
我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据.这就是我在做的事情:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Run Code Online (Sandbox Code Playgroud)
我希望这个调用能给我一个我文件的两个第一列的列表,但是我收到了这个错误:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
Run Code Online (Sandbox Code Playgroud)
虽然我的CSV文件不止一列.
我有一个正在运行的Spark应用程序,它占用了我的其他应用程序不会分配任何资源的所有核心.
我做了一些快速的研究,人们建议使用YARN kill或/ bin/spark-class来杀死命令.但是,我使用CDH版本和/ bin/spark-class甚至根本不存在,YARN kill应用程序也不起作用.
任何人都可以和我一起吗?
我正在尝试过滤具有None
行值的PySpark数据帧:
df.select('dt_mvmt').distinct().collect()
[Row(dt_mvmt=u'2016-03-27'),
Row(dt_mvmt=u'2016-03-28'),
Row(dt_mvmt=u'2016-03-29'),
Row(dt_mvmt=None),
Row(dt_mvmt=u'2016-03-30'),
Row(dt_mvmt=u'2016-03-31')]
Run Code Online (Sandbox Code Playgroud)
我可以使用字符串值正确过滤:
df[df.dt_mvmt == '2016-03-31']
# some results here
Run Code Online (Sandbox Code Playgroud)
但这失败了:
df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0
Run Code Online (Sandbox Code Playgroud)
但每个类别肯定都有价值观.这是怎么回事?
我处理一个包含两列mvv和count的数据帧.
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
Run Code Online (Sandbox Code Playgroud)
我想获得两个包含mvv值和计数值的列表.就像是
mvv = [1,2,3,4]
count = [5,9,3,1]
Run Code Online (Sandbox Code Playgroud)
所以,我尝试了以下代码:第一行应该返回一个python列表行.我想看到第一个值:
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
Run Code Online (Sandbox Code Playgroud)
但是我收到第二行的错误消息:
AttributeError:getInt
apache-spark ×10
pyspark ×10
python ×8
dataframe ×3
scala ×2
csv ×1
hadoop ×1
hadoop-yarn ×1
performance ×1
pyspark-sql ×1
rdd ×1