标签: pyspark

将"SPARK_HOME"设置为什么?

安装了apache-maven-3.3.3,scala 2.11.6,然后运行:

$ git clone git://github.com/apache/spark.git -b branch-1.4
$ cd spark
$ build/mvn -DskipTests clean package
Run Code Online (Sandbox Code Playgroud)

最后:

$ git clone https://github.com/apache/incubator-zeppelin
$ cd incubator-zeppelin/
$ mvn install -DskipTests
Run Code Online (Sandbox Code Playgroud)

然后运行服务器:

$ bin/zeppelin-daemon.sh start
Run Code Online (Sandbox Code Playgroud)

从一开始运行一个简单的笔记本%pyspark,我得到一个关于py4j找不到的错误.刚做过pip install py4j(参考).

现在我收到这个错误:

pyspark is not responding Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark.py", line 22, in <module>
    from pyspark.conf import SparkConf
ImportError: No module named pyspark.conf
Run Code Online (Sandbox Code Playgroud)

我已经尝试过设置SPARK_HOME:/spark/python:/spark/python/lib.没变.

python pythonpath apache-spark pyspark apache-zeppelin

21
推荐指数
1
解决办法
3万
查看次数

在DataFrame中用空/空值替换空字符串

我有一个Spark 1.5.0 DataFrame,null在同一列中混合了空字符串.我想将所有列中的所有空字符串转换为null(None在Python中).DataFrame可能有数百列,所以我试图避免每列的硬编码操作.

请参阅下面的我的尝试,这会导致错误.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Create a test DataFrame
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')])
testDF.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |    |   2|
## |null|null|
## +----+----+

## Try to replace an empty string with None/null
testDF.replace('', None).show()
## ValueError: value should be a float, int, long, string, list, or tuple

## A string value of …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

21
推荐指数
4
解决办法
3万
查看次数

Pyspark和PCA:如何提取此PCA的特征向量?如何计算他们解释的方差?

我正在使用pyspark(使用库)减少Spark DataFrame带有PCA模型的维度,spark ml如下所示:

pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)
Run Code Online (Sandbox Code Playgroud)

在哪里data是一个Spark DataFrame实验室,其中features一个DenseVector是3维:

data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')
Run Code Online (Sandbox Code Playgroud)

拟合后,我转换数据:

transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))
Run Code Online (Sandbox Code Playgroud)

我的问题是:如何提取此PCA的特征向量?如何计算他们解释的方差?

pca apache-spark apache-spark-sql pyspark apache-spark-ml

21
推荐指数
4
解决办法
1万
查看次数

在PySpark中将字典广播到rdd

我只是得到了Spark的悬念,我有需要映射到的函数rdd,但是使用了一个全局字典:

from pyspark import SparkContext

sc = SparkContext('local[*]', 'pyspark')

my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified
my_list = ["a", "d", "c", "b"]

def my_func(letter):
    return my_dict[letter]

my_list_rdd = sc.parallelize(my_list)

result = my_list_rdd.map(lambda x: my_func(x)).collect()

print result
Run Code Online (Sandbox Code Playgroud)

以上给出了预期的结果; 但是,我真的不确定我对全局变量的使用my_dict.似乎每个分区都会创建一个字典副本.它只是感觉不对..

看起来广播是我正在寻找的.但是,当我尝试使用它时:

my_dict_bc = sc.broadcast(my_dict)

def my_func(letter):
    return my_dict_bc[letter] 
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

TypeError: 'Broadcast' object has no attribute '__getitem__
Run Code Online (Sandbox Code Playgroud)

这似乎意味着我不能播放字典.

我的问题:如果我有一个使用全局字典的函数,需要将其映射到rdd,那么正确的方法是什么?

我的例子很简单,但在现实中my_dict,并my_list要大得多,而且my_func …

apache-spark pyspark

21
推荐指数
1
解决办法
1万
查看次数

如何在spark sql中进行左外连接?

我试图在spark(1.6.2)中进行左外连接,但它不起作用.我的SQL查询是这样的:

sqlContext.sql("select t.type, t.uuid, p.uuid
from symptom_type t LEFT JOIN plugin p 
ON t.uuid = p.uuid 
where t.created_year = 2016 
and p.created_year = 2016").show()
Run Code Online (Sandbox Code Playgroud)

结果是这样的:

+--------------------+--------------------+--------------------+
|                type|                uuid|                uuid|
+--------------------+--------------------+--------------------+
|              tained|89759dcc-50c0-490...|89759dcc-50c0-490...|
|             swapper|740cd0d4-53ee-438...|740cd0d4-53ee-438...|
Run Code Online (Sandbox Code Playgroud)

我使用LEFT JOIN或LEFT OUTER JOIN得到了相同的结果(第二个uuid不为null).

我希望第二个uuid列只能为null.如何正确地进行左外连接?

===其他信息==

如果我使用数据帧做左外连接我得到了正确的结果.

s = sqlCtx.sql('select * from symptom_type where created_year = 2016')
p = sqlCtx.sql('select * from plugin where created_year = 2016')

s.join(p, s.uuid == p.uuid, 'left_outer')
.select(s.type, s.uuid.alias('s_uuid'), 
        p.uuid.alias('p_uuid'), s.created_date, p.created_year, p.created_month).show()
Run Code Online (Sandbox Code Playgroud)

我有这样的结果:

+-------------------+--------------------+-----------------+--------------------+------------+-------------+
|               type|              s_uuid|           p_uuid|        created_date|created_year|created_month| …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

21
推荐指数
3
解决办法
6万
查看次数

pyspark导入用户定义的模块或.py文件

我构建了一个python模块,我想在我的pyspark应用程序中导入它.

我的包目录结构是:

wesam/
|-- data.py
`-- __init__.py
Run Code Online (Sandbox Code Playgroud)

import wesam我的pyspark脚本顶部的一个简单导致ImportError: No module named wesam.我也试图压缩它与我的代码以出货--py-files推荐这个答案,没有运气.

./bin/spark-submit --py-files wesam.zip mycode.py
Run Code Online (Sandbox Code Playgroud)

我也按照这个答案的建议以编程方式添加了文件,但是我得到了同样的ImportError: No module named wesam错误.

.sc.addPyFile("wesam.zip")
Run Code Online (Sandbox Code Playgroud)

我在这里错过了什么?

python python-module python-import apache-spark pyspark

21
推荐指数
1
解决办法
2万
查看次数

将数组数据分解为spark中的行

我有以下方式的数据集:

FieldA    FieldB    ArrayField
1         A         {1,2,3}
2         B         {3,5}
Run Code Online (Sandbox Code Playgroud)

我想爆炸ArrayField上的数据,因此输出将以下列方式显示:

FieldA    FieldB    ExplodedField
1         A         1
1         A         2
1         A         3
2         B         3
2         B         5
Run Code Online (Sandbox Code Playgroud)

我的意思是我想为ArrayField中的每个项生成一个输出行,同时保持其他字段的值.

你将如何在Spark中实现它.请注意,输入数据集非常大.

apache-spark pyspark

21
推荐指数
1
解决办法
4万
查看次数

如何在pyspark中的groupBy之后计算唯一ID

我每年使用以下代码来聚集学生.目的是了解每年的学生总数.

from pyspark.sql.functions import col
import pyspark.sql.functions as fn
gr = Df2.groupby(['Year'])
df_grouped = 
gr.agg(fn.count(col('Student_ID')).alias('total_student_by_year'))
Run Code Online (Sandbox Code Playgroud)

结果是:

[学生按年份] [1]

我发现有这么多ID重复的问题所以结果是错误的和巨大的.

我希望按年份对学生进行聚集,按年计算学生总数,并将ID重复计算.

我希望这个问题很清楚.我是新成员谢谢

python pyspark spark-dataframe pyspark-sql

21
推荐指数
2
解决办法
4万
查看次数

SQLException上的sqlContext HiveDriver错误:不支持方法

我一直在尝试使用sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver")Hive表进入Spark而没有任何成功.我做过研究并阅读如下:

如何从spark连接到远程配置单元服务器

Spark 1.5.1无法使用hive jdbc 1.2.0

http://belablotski.blogspot.in/2016/01/access-hive-tables-from-spark-using.html

我使用了最新的Hortonworks Sandbox 2.6并向社区询问了同样的问题:

https://community.hortonworks.com/questions/156828/pyspark-jdbc-py4jjavaerror-calling-o95load-javasql.html?childToView=156936#answer-156936

我想做的事情非常简单pyspark:

df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="sample_07",user="maria_dev", password="maria_dev").load()
Run Code Online (Sandbox Code Playgroud)

这给了我这个错误:

17/12/30 19:55:14 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10016/default
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark-client/python/pyspark/sql/readwriter.py", line 139, in load
    return self._df(self._jreader.load())
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error …
Run Code Online (Sandbox Code Playgroud)

hive jdbc hortonworks-data-platform apache-spark pyspark

21
推荐指数
1
解决办法
1279
查看次数

使用monotonically_increasing_id()将行号分配给pyspark数据帧

我使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧:

df1 = df1.withColumn("idx", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)

现在df1有26,572,528条记录.所以我期待idx值从0-26,572,527.

但是当我选择max(idx)时,它的值非常大:335,008,054,165.

这个功能发生了什么?使用此函数与具有相似记录数的其他数据集合并是否可靠?

我有大约300个数据帧,我想将它们组合成一个数据帧.因此,一个数据帧包含ID,而其他数据帧包含与行对应的不同记录

python indexing merge pyspark

21
推荐指数
2
解决办法
3万
查看次数