如果我启动pyspark然后运行此命令:
import my_script; spark = my_script.Sparker(sc); spark.collapse('./data/')
Run Code Online (Sandbox Code Playgroud)
一切都很好.但是,如果我尝试通过命令行和spark-submit执行相同的操作,则会收到错误消息:
Command: /usr/local/spark/bin/spark-submit my_script.py collapse ./data/
File "/usr/local/spark/python/pyspark/rdd.py", line 352, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1576, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/pyspark/shuffle.py", line 245, in mergeValues
for k, v in iterator:
File "/.../my_script.py", line 173, in _json_args_to_arr
js = cls._json(line)
RuntimeError: uninitialized staticmethod object
Run Code Online (Sandbox Code Playgroud)
my_script:
...
if __name__ == "__main__":
args = sys.argv[1:]
if args[0] == 'collapse':
directory = args[1]
from pyspark import SparkContext
sc = SparkContext(appName="Collapse")
spark = Sparker(sc)
spark.collapse(directory)
sc.stop() …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用python在Amazon s3存储桶中列出对象boto3
.
它似乎boto3
有2个函数用于列出存储桶中的对象:list_objects()
和list_objects_v2()
.
2和使用one之间的好处有什么区别?
我正在尝试将数据从AWS S3复制到Aurora Postgres.目前我的流程如下:
COPY FROM STDIN ...
"命令将数据从输入流加载到Aurora postgres.我想知道是否有命令直接从S3复制到Aurora postgres.
我有一个python脚本,它执行以下操作:i.它接受数据的输入文件(通常是嵌套的JSON格式)ii.将数据逐行传递给另一个函数,该函数将数据处理成所需的格式iii.最后它将输出写入文件.
这是我目前的简单python线,这样做......
def manipulate(line):
# a pure python function which transforms the data
# ...
return manipulated_json
for line in f:
components.append(manipulate(ujson.loads(line)))
write_to_csv(components)`
Run Code Online (Sandbox Code Playgroud)
这有效,但是python GIL将它限制在服务器上的一个核心,它的速度非常慢,特别是对于大量数据.
我通常处理的数据量约为4 gig gzip压缩,但偶尔我必须处理数百gig gzip压缩的数据.它不是必需的大数据,但仍无法在内存中进行处理,并且Python的GIL处理速度非常慢.
在寻找优化数据处理的解决方案时,我遇到了dask.虽然PySpark在当时似乎是我的明显解决方案,但是dask的承诺和它的简单性让我受益匪浅,我决定尝试一下.
经过对dask的大量研究以及如何使用它,我整理了一个非常小的脚本来复制我当前的过程.该脚本如下所示:
import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
Run Code Online (Sandbox Code Playgroud)
这工作并产生与原始非dask脚本相同的结果,但它仍然只在服务器上使用一个CPU.所以,它根本没有帮助.事实上,它的速度较慢.
我究竟做错了什么?我错过了什么吗?我仍然相当新闻,所以如果我忽略了某些事情或者我应该做一些完全不同的事情,请告诉我.
另外,是否有任何替代方法可以使用服务器的全部容量(即所有CPU)来完成我需要做的事情?
谢谢,
Ť
是否可以在Amazon S3存储桶中循环访问文件/密钥,使用Python读取内容并计算行数?
例如:
1. My bucket: "my-bucket-name"
2. File/Key : "test.txt"
Run Code Online (Sandbox Code Playgroud)
我需要遍历文件“ test.txt”并计算原始文件中的行数。
样例代码:
for bucket in conn.get_all_buckets():
if bucket.name == "my-bucket-name":
for file in bucket.list():
#need to count the number lines in each file and print to a log.
Run Code Online (Sandbox Code Playgroud) 我makefile
在其中尝试设置一些已存在于我编写的 bash 脚本中的变量,该脚本称为 :(set-vars.sh
它仅包含变量名称和值)。
我在其他几个bash
脚本中使用这些变量。我还需要能够在我的makefile
.
经过大量研究,并试图弄清楚什么是从设置变量的最佳方式的bash
脚本makefile
,我发现它的工作方式; 如下:
set-vars.sh
:
# set vars
foo=FOO1
bar=BAR1
baz=BAZ1
# if a command line arg is given (e.g. foo)
# echo value command line arg (e.g. echo $foo)
if ! [ -z ${1+x} ]; then echo ${!1}; fi
Run Code Online (Sandbox Code Playgroud)
这里的魔法是最后一行,它echo
是一个变量的值,其名称在命令行参数中提供,如果有命令行参数。我在 中调用此脚本makefile
并提供变量名称并使用echo
ed 值来设置makefile
变量。
Makefile
:
.PHONY: target1 target2 target3
export foo = $(shell sh set-vars.sh foo) …
Run Code Online (Sandbox Code Playgroud)