所以我希望我的Spark应用程序能够从亚马逊的S3中读取一些文本.我写了以下简单的脚本:
import boto3
s3_client = boto3.client('s3')
text_keys = ["key1.txt", "key2.txt"]
data = sc.parallelize(text_keys).flatMap(lambda key: s3_client.get_object(Bucket="my_bucket", Key=key)['Body'].read().decode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
当我这样做时,data.collect我收到以下错误:
TypeError: can't pickle thread.lock objects
Run Code Online (Sandbox Code Playgroud)
我似乎没有在网上找到任何帮助.也许有人设法解决了上述问题?
我有一个非常简单的DAG,其中包含两个任务,如下所示:
default_args = {
'owner': 'me',
'start_date': dt.datetime.today(),
'retries': 0,
'retry_delay': dt.timedelta(minutes=1)
}
dag = DAG(
'test DAG',
default_args=default_args,
schedule_interval=None
)
t0 = PythonOperator(
task_id="task 1",
python_callable=run_task_1,
op_args=[arg_1, args_2, args_3],
dag=dag,
execution_timeout=dt.timedelta(minutes=60)
)
t1 = PythonOperator(
task_id="task 2",
python_callable=run_task_2,
dag=dag,
execution_timeout=dt.timedelta(minutes=60)
)
t1.set_upstream(t0)
Run Code Online (Sandbox Code Playgroud)
但是,当我运行它时,我在日志中看到以下内容:
[2017-10-17 16:18:35,519] {jobs.py:2083}信息-任务退出,返回码-9
没有任何其他有用的错误日志。有人看过吗?我是否错误地定义了DAG?任何帮助表示赞赏!
所以我在 pyspark shell 上运行以下命令:
>>> data = spark.read.csv("annotations_000", header=False, mode="DROPMALFORMED", schema=schema)
>>> data.show(3)
+----------+--------------------+--------------------+---------+---------+--------+-----------------+
| item_id| review_id| text| aspect|sentiment|comments| annotation_round|
+----------+--------------------+--------------------+---------+---------+--------+-----------------+
|9999900031|9999900031/custom...|Just came back to...|breakfast| 3| null|ASE_OpeNER_round2|
|9999900031|9999900031/custom...|Just came back to...| staff| 3| null|ASE_OpeNER_round2|
|9999900031|9999900031/custom...|The hotel was loc...| noise| 2| null|ASE_OpeNER_round2|
+----------+--------------------+--------------------+---------+---------+--------+-----------------+
>>> data.registerTempTable("temp")
>>> df = sqlContext.sql("select first(item_id), review_id, first(text), concat_ws(';', collect_list(aspect)) as aspect from temp group by review_id")
>>> df.show(3)
+---------------------+--------------------+--------------------+--------------------+
|first(item_id, false)| review_id| first(text, false)| aspect|
+---------------------+--------------------+--------------------+--------------------+
| 100012|100012/tripadviso...|We stayed here la...| staff;room|
| 100013|100013/tripadviso...|We stayed for …Run Code Online (Sandbox Code Playgroud) 因此,我有一个 AWS Kinesis 流,我可以在其中为多个使用者发布事件。对于他们中的大多数人来说,接收热数据很重要——这意味着他们中的许多人可能会同时轮询和读取最新数据。根据 AWS 文档,增加分片数量将提高并行度,而每秒读取次数最多可达每个分片每秒 5 次。我的问题是,添加更多分片是否(以及如何?)有助于解决所有消费者都是最新的并尝试从同一分片读取新传入数据的情况?似乎每秒读取的限制会自动引入对您可以拥有的消费者数量的限制(至少当它们需要随时更新时),或者我错过了什么?