小编tam*_*jd1的帖子

spark-submit和pyspark有什么区别?

如果我启动pyspark然后运行此命令:

import my_script; spark = my_script.Sparker(sc); spark.collapse('./data/')
Run Code Online (Sandbox Code Playgroud)

一切都很好.但是,如果我尝试通过命令行和spark-submit执行相同的操作,则会收到错误消息:

Command: /usr/local/spark/bin/spark-submit my_script.py collapse ./data/
  File "/usr/local/spark/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1576, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/pyspark/shuffle.py", line 245, in mergeValues
    for k, v in iterator:
  File "/.../my_script.py", line 173, in _json_args_to_arr
    js = cls._json(line)
RuntimeError: uninitialized staticmethod object
Run Code Online (Sandbox Code Playgroud)

my_script:

...
if __name__ == "__main__":
    args = sys.argv[1:]
    if args[0] == 'collapse':
        directory = args[1]
        from pyspark import SparkContext
        sc = SparkContext(appName="Collapse")
        spark = Sparker(sc)
        spark.collapse(directory)
        sc.stop() …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

17
推荐指数
1
解决办法
4万
查看次数

boto3 list_objects和list_objects_v2有什么区别?

我正在尝试使用python在Amazon s3存储桶中列出对象boto3.

它似乎boto3有2个函数用于列出存储桶中的对象:list_objects()list_objects_v2().

2和使用one之间的好处有什么区别?

python amazon-s3 boto3

17
推荐指数
1
解决办法
1万
查看次数

将数据从AWS S3复制到Aurora Postgres

我正在尝试将数据从AWS S3复制到Aurora Postgres.目前我的流程如下:

  1. 从本地S3下载文件(在EC2实例上)
  2. 运行" COPY FROM STDIN ..."命令将数据从输入流加载到Aurora postgres.

我想知道是否有命令直接从S3复制到Aurora postgres.

postgresql amazon-s3 amazon-aurora

8
推荐指数
1
解决办法
3729
查看次数

dask包没有使用所有核心?备择方案?

我有一个python脚本,它执行以下操作:i.它接受数据的输入文件(通常是嵌套的JSON格式)ii.将数据逐行传递给另一个函数,该函数将数据处理成所需的格式iii.最后它将输出写入文件.

这是我目前的简单python线,这样做......

def manipulate(line):
    # a pure python function which transforms the data
    # ...
    return manipulated_json

for line in f:
    components.append(manipulate(ujson.loads(line)))
    write_to_csv(components)`
Run Code Online (Sandbox Code Playgroud)

这有效,但是python GIL将它限制在服务器上的一个核心,它的速度非常慢,特别是对于大量数据.

我通常处理的数据量约为4 gig gzip压缩,但偶尔我必须处理数百gig gzip压缩的数据.它不是必需的大数据,但仍无法在内存中进行处理,并且Python的GIL处理速度非常慢.

在寻找优化数据处理的解决方案时,我遇到了dask.虽然PySpark在当时似乎是我的明显解决方案,但是dask的承诺和它的简单性让我受益匪浅,我决定尝试一下.

经过对dask的大量研究以及如何使用它,我整理了一个非常小的脚本来复制我当前的过程.该脚本如下所示:

import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
Run Code Online (Sandbox Code Playgroud)

这工作并产生与原始非dask脚本相同的结果,但它仍然只在服务器上使用一个CPU.所以,它根本没有帮助.事实上,它的速度较慢.

我究竟做错了什么?我错过了什么吗?我仍然相当新闻,所以如果我忽略了某些事情或者我应该做一些完全不同的事情,请告诉我.

另外,是否有任何替代方法可以使用服务器的全部容量(即所有CPU)来完成我需要做的事情?

谢谢,

Ť

python parallel-processing json export-to-csv dask

6
推荐指数
1
解决办法
902
查看次数

是否可以使用Python遍历Amazon S3存储桶并计算其文件/密钥中的行数?

是否可以在Amazon S3存储桶中循环访问文件/密钥,使用Python读取内容并计算行数?

例如:

  1. My bucket: "my-bucket-name"
  2. File/Key : "test.txt" 
Run Code Online (Sandbox Code Playgroud)

我需要遍历文件“ test.txt”并计算原始文件中的行数。

样例代码:

for bucket in conn.get_all_buckets():
    if bucket.name == "my-bucket-name":
        for file in bucket.list():
            #need to count the number lines in each file and print to a log.
Run Code Online (Sandbox Code Playgroud)

python amazon-s3 boto amazon-web-services

3
推荐指数
1
解决办法
8541
查看次数

从bash脚本读取makefile变量

makefile在其中尝试设置一些已存在于我编写的 bash 脚本中的变量,该脚本称为 :(set-vars.sh它仅包含变量名称和值)。

我在其他几个bash脚本中使用这些变量。我还需要能够在我的makefile.

经过大量研究,并试图弄清楚什么是从设置变量的最佳方式的bash脚本makefile,我发现它的工作方式; 如下:

set-vars.sh

# set vars
foo=FOO1
bar=BAR1
baz=BAZ1

# if a command line arg is given (e.g. foo) 
# echo value command line arg (e.g. echo $foo)

if ! [ -z ${1+x} ]; then echo ${!1}; fi
Run Code Online (Sandbox Code Playgroud)

这里的魔法是最后一行,它echo是一个变量的值,其名称在命令行参数中提供,如果有命令行参数。我在 中调用此脚本makefile并提供变量名称并使用echoed 值来设置makefile变量。

Makefile

.PHONY:  target1 target2 target3

export foo = $(shell sh set-vars.sh foo) …
Run Code Online (Sandbox Code Playgroud)

linux variables bash shell makefile

2
推荐指数
1
解决办法
1993
查看次数