安装了apache-maven-3.3.3,scala 2.11.6,然后运行:
$ git clone git://github.com/apache/spark.git -b branch-1.4
$ cd spark
$ build/mvn -DskipTests clean package
Run Code Online (Sandbox Code Playgroud)
最后:
$ git clone https://github.com/apache/incubator-zeppelin
$ cd incubator-zeppelin/
$ mvn install -DskipTests
Run Code Online (Sandbox Code Playgroud)
然后运行服务器:
$ bin/zeppelin-daemon.sh start
Run Code Online (Sandbox Code Playgroud)
从一开始运行一个简单的笔记本%pyspark,我得到一个关于py4j找不到的错误.刚做过pip install py4j(参考).
现在我收到这个错误:
pyspark is not responding Traceback (most recent call last):
File "/tmp/zeppelin_pyspark.py", line 22, in <module>
from pyspark.conf import SparkConf
ImportError: No module named pyspark.conf
Run Code Online (Sandbox Code Playgroud)
我已经尝试过设置SPARK_HOME:/spark/python:/spark/python/lib.没变.
我有一个Spark 1.5.0 DataFrame,null在同一列中混合了空字符串.我想将所有列中的所有空字符串转换为null(None在Python中).DataFrame可能有数百列,所以我试图避免每列的硬编码操作.
请参阅下面的我的尝试,这会导致错误.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
## Create a test DataFrame
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')])
testDF.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo| 1|
## | | 2|
## |null|null|
## +----+----+
## Try to replace an empty string with None/null
testDF.replace('', None).show()
## ValueError: value should be a float, int, long, string, list, or tuple
## A string value of …Run Code Online (Sandbox Code Playgroud) 我正在使用pyspark(使用库)减少Spark DataFrame带有PCA模型的维度,spark ml如下所示:
pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)
Run Code Online (Sandbox Code Playgroud)
在哪里data是一个Spark DataFrame实验室,其中features一个DenseVector是3维:
data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')
Run Code Online (Sandbox Code Playgroud)
拟合后,我转换数据:
transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))
Run Code Online (Sandbox Code Playgroud)
我的问题是:如何提取此PCA的特征向量?如何计算他们解释的方差?
我只是得到了Spark的悬念,我有需要映射到的函数rdd,但是使用了一个全局字典:
from pyspark import SparkContext
sc = SparkContext('local[*]', 'pyspark')
my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified
my_list = ["a", "d", "c", "b"]
def my_func(letter):
return my_dict[letter]
my_list_rdd = sc.parallelize(my_list)
result = my_list_rdd.map(lambda x: my_func(x)).collect()
print result
Run Code Online (Sandbox Code Playgroud)
以上给出了预期的结果; 但是,我真的不确定我对全局变量的使用my_dict.似乎每个分区都会创建一个字典副本.它只是感觉不对..
看起来广播是我正在寻找的.但是,当我尝试使用它时:
my_dict_bc = sc.broadcast(my_dict)
def my_func(letter):
return my_dict_bc[letter]
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
TypeError: 'Broadcast' object has no attribute '__getitem__
Run Code Online (Sandbox Code Playgroud)
这似乎意味着我不能播放字典.
我的问题:如果我有一个使用全局字典的函数,需要将其映射到rdd,那么正确的方法是什么?
我的例子很简单,但在现实中my_dict,并my_list要大得多,而且my_func …
我试图在spark(1.6.2)中进行左外连接,但它不起作用.我的SQL查询是这样的:
sqlContext.sql("select t.type, t.uuid, p.uuid
from symptom_type t LEFT JOIN plugin p
ON t.uuid = p.uuid
where t.created_year = 2016
and p.created_year = 2016").show()
Run Code Online (Sandbox Code Playgroud)
结果是这样的:
+--------------------+--------------------+--------------------+
| type| uuid| uuid|
+--------------------+--------------------+--------------------+
| tained|89759dcc-50c0-490...|89759dcc-50c0-490...|
| swapper|740cd0d4-53ee-438...|740cd0d4-53ee-438...|
Run Code Online (Sandbox Code Playgroud)
我使用LEFT JOIN或LEFT OUTER JOIN得到了相同的结果(第二个uuid不为null).
我希望第二个uuid列只能为null.如何正确地进行左外连接?
===其他信息==
如果我使用数据帧做左外连接我得到了正确的结果.
s = sqlCtx.sql('select * from symptom_type where created_year = 2016')
p = sqlCtx.sql('select * from plugin where created_year = 2016')
s.join(p, s.uuid == p.uuid, 'left_outer')
.select(s.type, s.uuid.alias('s_uuid'),
p.uuid.alias('p_uuid'), s.created_date, p.created_year, p.created_month).show()
Run Code Online (Sandbox Code Playgroud)
我有这样的结果:
+-------------------+--------------------+-----------------+--------------------+------------+-------------+
| type| s_uuid| p_uuid| created_date|created_year|created_month| …Run Code Online (Sandbox Code Playgroud) 我构建了一个python模块,我想在我的pyspark应用程序中导入它.
我的包目录结构是:
wesam/
|-- data.py
`-- __init__.py
Run Code Online (Sandbox Code Playgroud)
import wesam我的pyspark脚本顶部的一个简单导致ImportError: No module named wesam.我也试图压缩它与我的代码以出货--py-files为推荐这个答案,没有运气.
./bin/spark-submit --py-files wesam.zip mycode.py
Run Code Online (Sandbox Code Playgroud)
我也按照这个答案的建议以编程方式添加了文件,但是我得到了同样的ImportError: No module named wesam错误.
.sc.addPyFile("wesam.zip")
Run Code Online (Sandbox Code Playgroud)
我在这里错过了什么?
我有以下方式的数据集:
FieldA FieldB ArrayField
1 A {1,2,3}
2 B {3,5}
Run Code Online (Sandbox Code Playgroud)
我想爆炸ArrayField上的数据,因此输出将以下列方式显示:
FieldA FieldB ExplodedField
1 A 1
1 A 2
1 A 3
2 B 3
2 B 5
Run Code Online (Sandbox Code Playgroud)
我的意思是我想为ArrayField中的每个项生成一个输出行,同时保持其他字段的值.
你将如何在Spark中实现它.请注意,输入数据集非常大.
我每年使用以下代码来聚集学生.目的是了解每年的学生总数.
from pyspark.sql.functions import col
import pyspark.sql.functions as fn
gr = Df2.groupby(['Year'])
df_grouped =
gr.agg(fn.count(col('Student_ID')).alias('total_student_by_year'))
Run Code Online (Sandbox Code Playgroud)
结果是:
[学生按年份] [1]
我发现有这么多ID重复的问题所以结果是错误的和巨大的.
我希望按年份对学生进行聚集,按年计算学生总数,并将ID重复计算.
我希望这个问题很清楚.我是新成员谢谢
我一直在尝试使用sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver")Hive表进入Spark而没有任何成功.我做过研究并阅读如下:
Spark 1.5.1无法使用hive jdbc 1.2.0
http://belablotski.blogspot.in/2016/01/access-hive-tables-from-spark-using.html
我使用了最新的Hortonworks Sandbox 2.6并向社区询问了同样的问题:
我想做的事情非常简单pyspark:
df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="sample_07",user="maria_dev", password="maria_dev").load()
Run Code Online (Sandbox Code Playgroud)
这给了我这个错误:
17/12/30 19:55:14 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10016/default
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark-client/python/pyspark/sql/readwriter.py", line 139, in load
return self._df(self._jreader.load())
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error …Run Code Online (Sandbox Code Playgroud) 我使用monotonically_increasing_id()使用以下语法将行号分配给pyspark数据帧:
df1 = df1.withColumn("idx", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)
现在df1有26,572,528条记录.所以我期待idx值从0-26,572,527.
但是当我选择max(idx)时,它的值非常大:335,008,054,165.
这个功能发生了什么?使用此函数与具有相似记录数的其他数据集合并是否可靠?
我有大约300个数据帧,我想将它们组合成一个数据帧.因此,一个数据帧包含ID,而其他数据帧包含与行对应的不同记录
pyspark ×10
apache-spark ×8
python ×5
dataframe ×1
hive ×1
indexing ×1
jdbc ×1
merge ×1
pca ×1
pyspark-sql ×1
pythonpath ×1