标签: pyspark

在Spark中组合Row()

看似简单的问题,却找不到答案.

问题:我创建了一个函数,我将传递给map(),它接受一个字段并从中创建三个字段.我希望map()的输出给我一个新的RDD,包括输入RDD和新/输出RDD的字段.我该怎么做呢?

我是否需要将我的数据键添加到函数的输出中,以便我可以将更多输出RDD加入到我原来的RDD中?这是正确的/最佳做法吗?

def extract_fund_code_from_iv_id(holding):
    # Must include key of data for later joining
    iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
    return iv_id
Run Code Online (Sandbox Code Playgroud)

更基本的,我似乎无法结合两个Row.

row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
Run Code Online (Sandbox Code Playgroud)

这不会像我想要的那样返回一个新的Row().

谢谢

apache-spark-sql pyspark

0
推荐指数
1
解决办法
2694
查看次数

如何合并元素两个RDD

假设我有两个RDD

第一

1
2
3
4
5
Run Code Online (Sandbox Code Playgroud)

第二

6
7
8
9
10
Run Code Online (Sandbox Code Playgroud)

新的RDD将是

1 6
2 7
3 8
4 9
5 10
Run Code Online (Sandbox Code Playgroud)

所以,这基本上是元素明智的合并...我们假设两个RDD具有相同的大小.

python apache-spark rdd pyspark

0
推荐指数
1
解决办法
978
查看次数

嵌套循环的pyspark程序

我是PySpark的新手,我想了解如何在PySpark中编写多个嵌套for循环,下面是粗略的高级示例.任何帮助将不胜感激.

for ( i=0;i<10;i++)
   for ( j=0;j<10;j++)
       for ( k=0;k<10;k++)
          { 
           print "i"."j"."k"
}
Run Code Online (Sandbox Code Playgroud)

python for-loop apache-spark rdd pyspark

0
推荐指数
1
解决办法
5343
查看次数

为什么`getNumPartitions()`没有给我"repartition"指定的正确数量的分区?

我有一个textFile和RDD一样:sc.textFile(<file_name>).

我尝试重新分区RDD以加快处理速度:

sc.repartition(<n>).

无论我投入什么<n>,它似乎都没有改变,如下所示:

RDD.getNumPartitions()总是打印相同的数字,(3)无论如何.

如何更改分区数以提高性能?

hadoop-partitioning partition apache-spark pyspark

0
推荐指数
1
解决办法
3433
查看次数

count()会导致map()代码在Spark中执行吗?

所以,我知道Spark是一个懒惰的执行者.例如,如果我打电话

post = pre.filter(lambda x: some_condition(x)).map(lambda x: do_something(x))

我知道它不会立即执行.

但是当我打电话时,上面的代码会发生什么post.count()?我想象中的滤波将被迫执行,因为prepost可能不会有相同的行数,因为有一个filter条件存在.但是,map是1对1的关系,因此计数不会受其影响.这个map命令会在这里执行count()吗?

跟进:当我想强制执行map语句时(假设count()不起作用),我可以调用哪些强制执行?我宁愿不必使用saveAsTextFile().

apache-spark pyspark

0
推荐指数
1
解决办法
363
查看次数

pyspark:在一个地图函数中具有多个操作的语法错误

我在我的pyspark map函数中添加了一个额外的操作.

原来的功能是:

results = input.map(lambda row:process_myData(row)) 
Run Code Online (Sandbox Code Playgroud)

哪个工作正常.然后我尝试添加如下的附加操作:

results = input.map{lambda row:
            row1 = row.replace("abc","def")
            process_myData(row1)}
Run Code Online (Sandbox Code Playgroud)

然后我得到了下面的语法错误:

    results = input.map{lambda row:
                       ^
SyntaxError: invalid syntax
Run Code Online (Sandbox Code Playgroud)

我在这做错了什么?谢谢!

python lambda apache-spark pyspark

0
推荐指数
1
解决办法
537
查看次数

PySpark SparkContext名称错误'sc'在jupyter中

我是pyspark的新手,想在我的Ubuntu 12.04机器上使用Ipython笔记本使用pyspark.以下是pyspark和Ipython笔记本的配置.

sparkuser@Ideapad:~$ echo $JAVA_HOME
/usr/lib/jvm/java-8-oracle

# Path for Spark
sparkuser@Ideapad:~$ ls /home/sparkuser/spark/
bin    CHANGES.txt  data  examples  LICENSE   NOTICE  R          RELEASE  scala-2.11.6.deb
build  conf         ec2   lib       licenses  python  README.md  sbin     spark-1.5.2-bin-hadoop2.6.tgz
Run Code Online (Sandbox Code Playgroud)

我安装了Anaconda2 4.0.0和anaconda的路径:

sparkuser@Ideapad:~$ ls anaconda2/
bin  conda-meta  envs  etc  Examples  imports  include  lib  LICENSE.txt  mkspecs  pkgs  plugins  share  ssl  tests
Run Code Online (Sandbox Code Playgroud)

为IPython创建PySpark配置文件.

ipython profile create pyspark

sparkuser@Ideapad:~$ cat .bashrc

export SPARK_HOME="$HOME/spark"
export PYSPARK_SUBMIT_ARGS="--master local[2]"
# added by Anaconda2 4.0.0 installer
export PATH="/home/sparkuser/anaconda2/bin:$PATH"
Run Code Online (Sandbox Code Playgroud)

创建一个名为〜/ .ipython/profile_pyspark/startup/00-pyspark-setup.py的文件:

sparkuser@Ideapad:~$ cat .ipython/profile_pyspark/startup/00-pyspark-setup.py 
import os
import …
Run Code Online (Sandbox Code Playgroud)

ipython anaconda apache-spark pyspark jupyter-notebook

0
推荐指数
1
解决办法
7484
查看次数

pyspark在谷歌数据中心失败了

我的工作因以下日志而失败,但是,我并不完全理解.这似乎是由

" YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 24.7 GB of 24 GB physical".

但是我怎样才能增加谷歌数据流中的内存.

日志:

16/05/05 01:12:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 332.0 in stage 0.0 (TID 332, cluster-4-w-40.c.ll-1167.internal): ExecutorLostFailure (executor 114 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 25.2 GB of 24 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
16/05/05 01:12:42 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 335.0 in stage 0.0 (TID 335, cluster-4-w-40.c.ll-1167.internal): ExecutorLostFailure (executor 114 …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark google-cloud-dataproc

0
推荐指数
1
解决办法
3058
查看次数

PySpark-如何找出数组列中最常出现的前 n 个值?

对于下面的示例数据,想知道如何找出列中最常出现的值colour。的数据类型colour是WrappedArray。数组中可能有 n 个元素。在此示例中,颜色应为黄色,然后是出现两次的蓝色。非常感谢您的帮助。

Name   Colour 
 A      ('blue','yellow')
 B      ('pink', 'yellow')
 C      ('green', 'black')
 D      ('yellow','orange','blue')
Run Code Online (Sandbox Code Playgroud)

python arrays apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
190
查看次数

在 PySpark 中使用动态键展平嵌套 JSON 结构

我正在尝试使用PySpark处理包含带有动态键的结构列的 json 文件。

结构列的架构如下所示:

{
  "UUID_KEY": {
     "time": STRING
     "amount": INTEGER
  }
}
Run Code Online (Sandbox Code Playgroud)

数据如下:

ID json_列
1 “{1:{金额:1,时间:2},2:{金额:10,时间:5}}”
2 “{3:{金额:1,时间:2},4:{金额:10,时间:5}”

目前,我将结构列作为字符串,因为通过指定/推断模式加载 JSON 不起作用因为第一层的键是随机生成的,并且数据太多。第二层始终相同,它包含amounttime

有没有办法在不知道第一层的键的情况下将此 JSON 字符串平铺到amount和列中?time

json apache-spark apache-spark-sql pyspark databricks

0
推荐指数
1
解决办法
718
查看次数