我试图转换DataFrameSpark-Scala中的所有标题/列名称.截至目前,我想出了以下代码,它只替换了一个列名.
for( i <- 0 to origCols.length - 1) {
df.withColumnRenamed(
df.columns(i),
df.columns(i).toLowerCase
);
}
Run Code Online (Sandbox Code Playgroud) 有人对SparkR vs sparklyr的优缺点有所概述吗?谷歌没有产生任何令人满意的结果,两者看起来非常相似.尝试两种方式,SparkR看起来更麻烦,而sparklyr非常简单(既可以安装也可以使用,特别是使用dplyr输入).sparklyr只能用于并行运行dplyr函数或"普通"R代码吗?
最好
如何命名,我怎么知道CentOS中安装了哪个版本的火花?
当前系统已安装cdh5.1.0.
问题几乎在标题中.我找不到有关差异的详细文档.
我注意到了一个区别,因为在交换cube和groupBy函数调用时,我会得到不同的结果.我注意到对于使用'cube'的结果,我在经常分组的表达式上得到了很多空值.
SparkContext的Scala版本具有该属性
sc.hadoopConfiguration
Run Code Online (Sandbox Code Playgroud)
我已成功使用它来设置Hadoop属性(在Scala中)
例如
sc.hadoopConfiguration.set("my.mapreduce.setting","someVal")
Run Code Online (Sandbox Code Playgroud)
然而,SparkContext的python版本缺少该访问器.有没有办法将Hadoop配置值设置为PySpark上下文使用的Hadoop配置?
我想通过Web应用程序向用户公开我的Spark应用程序.
基本上,用户可以决定他想要运行哪个动作并输入一些需要传递给spark应用程序的变量.例如:用户输入几个字段,然后点击一个按钮,用于执行以下操作的"运行sparkApp1与放慢参数MIN_X,MAX_X,MIN_Y,MAX_Y".
应该使用用户给出的参数启动spark应用程序.完成后,可能需要Web应用程序来检索结果(来自hdfs或mongodb)并将其显示给用户.处理时,Web应用程序应显示Spark应用程序的状态.
我的问题:
我正在运行带有YARN/Mesos(尚不确定)和MongoDB的Spark 1.6.1集群.
我需要为包含许多列的数据表生成row_numbers的完整列表.
在SQL中,这将如下所示:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Run Code Online (Sandbox Code Playgroud)
现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Run Code Online (Sandbox Code Playgroud)
我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD.
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
Run Code Online (Sandbox Code Playgroud)
(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)
我该怎么做呢?
这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3) …Run Code Online (Sandbox Code Playgroud) 我有一个Python类,我用它来加载和处理Spark中的一些数据.在我需要做的各种事情中,我正在生成一个从Spark数据帧中的各个列派生的虚拟变量列表.我的问题是我不确定如何正确定义用户定义函数来完成我需要的东西.
我做目前有,当映射了潜在的数据帧RDD,解决了问题的一半(记住,这是在一个更大的方法等data_processor类):
def build_feature_arr(self,table):
# this dict has keys for all the columns for which I need dummy coding
categories = {'gender':['1','2'], ..}
# there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
if table == 'users':
iter_over = self.config.dyadic_features_to_include
elif table == 'activty':
iter_over = self.config.user_features_to_include
def _build_feature_arr(row):
result = []
row = row.asDict()
for …Run Code Online (Sandbox Code Playgroud) python apache-spark apache-spark-sql apache-spark-ml apache-spark-mllib
我一直在尝试使用sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver")Hive表进入Spark而没有任何成功.我做过研究并阅读如下:
Spark 1.5.1无法使用hive jdbc 1.2.0
http://belablotski.blogspot.in/2016/01/access-hive-tables-from-spark-using.html
我使用了最新的Hortonworks Sandbox 2.6并向社区询问了同样的问题:
我想做的事情非常简单pyspark:
df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="sample_07",user="maria_dev", password="maria_dev").load()
Run Code Online (Sandbox Code Playgroud)
这给了我这个错误:
17/12/30 19:55:14 INFO HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10016/default
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark-client/python/pyspark/sql/readwriter.py", line 139, in load
return self._df(self._jreader.load())
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
File "/usr/hdp/current/spark-client/python/pyspark/sql/utils.py", line 45, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark-client/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error …Run Code Online (Sandbox Code Playgroud) 我试图运行2个函数,使用PySpark在一个RDD上进行完全独立的转换.有什么方法可以做同样的事情?
def doXTransforms(sampleRDD):
(X transforms)
def doYTransforms(sampleRDD):
(Y Transforms)
if __name__ == "__main__":
sc = SparkContext(appName="parallelTransforms")
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)
rows_rdd = hive_context.sql("select * from tables.X_table")
p1 = Process(target=doXTransforms , args=(rows_rdd,))
p1.start()
p2 = Process(target=doYTransforms, args=(rows_rdd,))
p2.start()
p1.join()
p2.join()
sc.stop()
Run Code Online (Sandbox Code Playgroud)
这不起作用,我现在明白这不起作用.但有没有其他方法可以使这项工作?特别是有任何python-spark特定解决方案吗?
python-2.7 apache-spark apache-spark-sql python-multiprocessing pyspark
apache-spark ×10
pyspark ×3
scala ×2
sql ×2
cloudera-cdh ×1
cube ×1
dataframe ×1
hive ×1
jdbc ×1
python ×1
python-2.7 ×1
r ×1
rdd ×1
rollup ×1
row-number ×1
sparklyr ×1
sparkr ×1