看似简单的问题,却找不到答案.
问题:我创建了一个函数,我将传递给map(),它接受一个字段并从中创建三个字段.我希望map()的输出给我一个新的RDD,包括输入RDD和新/输出RDD的字段.我该怎么做呢?
我是否需要将我的数据键添加到函数的输出中,以便我可以将更多输出RDD加入到我原来的RDD中?这是正确的/最佳做法吗?
def extract_fund_code_from_iv_id(holding):
# Must include key of data for later joining
iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
return iv_id
Run Code Online (Sandbox Code Playgroud)
更基本的,我似乎无法结合两个Row.
row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
Run Code Online (Sandbox Code Playgroud)
这不会像我想要的那样返回一个新的Row().
谢谢
假设我有两个RDD
第一
1
2
3
4
5
Run Code Online (Sandbox Code Playgroud)
第二
6
7
8
9
10
Run Code Online (Sandbox Code Playgroud)
新的RDD将是
1 6
2 7
3 8
4 9
5 10
Run Code Online (Sandbox Code Playgroud)
所以,这基本上是元素明智的合并...我们假设两个RDD具有相同的大小.
我是PySpark的新手,我想了解如何在PySpark中编写多个嵌套for循环,下面是粗略的高级示例.任何帮助将不胜感激.
for ( i=0;i<10;i++)
for ( j=0;j<10;j++)
for ( k=0;k<10;k++)
{
print "i"."j"."k"
}
Run Code Online (Sandbox Code Playgroud) 我有一个textFile和RDD一样:sc.textFile(<file_name>).
我尝试重新分区RDD以加快处理速度:
sc.repartition(<n>).
无论我投入什么<n>,它似乎都没有改变,如下所示:
RDD.getNumPartitions()总是打印相同的数字,(3)无论如何.
如何更改分区数以提高性能?
所以,我知道Spark是一个懒惰的执行者.例如,如果我打电话
post = pre.filter(lambda x: some_condition(x)).map(lambda x: do_something(x))
我知道它不会立即执行.
但是当我打电话时,上面的代码会发生什么post.count()?我想象中的滤波将被迫执行,因为pre和post可能不会有相同的行数,因为有一个filter条件存在.但是,map是1对1的关系,因此计数不会受其影响.这个map命令会在这里执行count()吗?
跟进:当我想强制执行map语句时(假设count()不起作用),我可以调用哪些强制执行?我宁愿不必使用saveAsTextFile().
我在我的pyspark map函数中添加了一个额外的操作.
原来的功能是:
results = input.map(lambda row:process_myData(row))
Run Code Online (Sandbox Code Playgroud)
哪个工作正常.然后我尝试添加如下的附加操作:
results = input.map{lambda row:
row1 = row.replace("abc","def")
process_myData(row1)}
Run Code Online (Sandbox Code Playgroud)
然后我得到了下面的语法错误:
results = input.map{lambda row:
^
SyntaxError: invalid syntax
Run Code Online (Sandbox Code Playgroud)
我在这做错了什么?谢谢!
我是pyspark的新手,想在我的Ubuntu 12.04机器上使用Ipython笔记本使用pyspark.以下是pyspark和Ipython笔记本的配置.
sparkuser@Ideapad:~$ echo $JAVA_HOME
/usr/lib/jvm/java-8-oracle
# Path for Spark
sparkuser@Ideapad:~$ ls /home/sparkuser/spark/
bin CHANGES.txt data examples LICENSE NOTICE R RELEASE scala-2.11.6.deb
build conf ec2 lib licenses python README.md sbin spark-1.5.2-bin-hadoop2.6.tgz
Run Code Online (Sandbox Code Playgroud)
我安装了Anaconda2 4.0.0和anaconda的路径:
sparkuser@Ideapad:~$ ls anaconda2/
bin conda-meta envs etc Examples imports include lib LICENSE.txt mkspecs pkgs plugins share ssl tests
Run Code Online (Sandbox Code Playgroud)
为IPython创建PySpark配置文件.
ipython profile create pyspark
sparkuser@Ideapad:~$ cat .bashrc
export SPARK_HOME="$HOME/spark"
export PYSPARK_SUBMIT_ARGS="--master local[2]"
# added by Anaconda2 4.0.0 installer
export PATH="/home/sparkuser/anaconda2/bin:$PATH"
Run Code Online (Sandbox Code Playgroud)
创建一个名为〜/ .ipython/profile_pyspark/startup/00-pyspark-setup.py的文件:
sparkuser@Ideapad:~$ cat .ipython/profile_pyspark/startup/00-pyspark-setup.py
import os
import …Run Code Online (Sandbox Code Playgroud) 我的工作因以下日志而失败,但是,我并不完全理解.这似乎是由
" YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 24.7 GB of 24 GB physical".
但是我怎样才能增加谷歌数据流中的内存.
日志:
16/05/05 01:12:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 332.0 in stage 0.0 (TID 332, cluster-4-w-40.c.ll-1167.internal): ExecutorLostFailure (executor 114 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 25.2 GB of 24 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
16/05/05 01:12:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 335.0 in stage 0.0 (TID 335, cluster-4-w-40.c.ll-1167.internal): ExecutorLostFailure (executor 114 …Run Code Online (Sandbox Code Playgroud) 对于下面的示例数据,想知道如何找出列中最常出现的值colour。的数据类型colour是WrappedArray。数组中可能有 n 个元素。在此示例中,颜色应为黄色,然后是出现两次的蓝色。非常感谢您的帮助。
Name Colour
A ('blue','yellow')
B ('pink', 'yellow')
C ('green', 'black')
D ('yellow','orange','blue')
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用PySpark处理包含带有动态键的结构列的 json 文件。
结构列的架构如下所示:
{
"UUID_KEY": {
"time": STRING
"amount": INTEGER
}
}
Run Code Online (Sandbox Code Playgroud)
数据如下:
| ID | json_列 |
|---|---|
| 1 | “{1:{金额:1,时间:2},2:{金额:10,时间:5}}” |
| 2 | “{3:{金额:1,时间:2},4:{金额:10,时间:5}” |
目前,我将结构列作为字符串,因为通过指定/推断模式加载 JSON 不起作用,因为第一层的键是随机生成的,并且数据太多。第二层始终相同,它包含amount和time。
有没有办法在不知道第一层的键的情况下将此 JSON 字符串平铺到amount和列中?time