小编abs*_*ted的帖子

如何在 pyspark 中从一个 S3 存储桶读取数据并将其写入另一个 S3 存储桶?

我尝试执行以下操作 - 为每个存储桶设置特定配置 - 从一个 S3 存储桶读取数据并写入另一个存储桶。这两个存储桶具有不同的凭据并且属于不同的帐户。但是,读取仅适用于第一个存储桶。也就是说,如果我们首先访问第一个存储桶,则第二个存储桶将永远不可用(反之亦然)。

import os

## B1 CONF
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b1.access.key", B1_ACCESS_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b1.secret.key", B1_SECRET_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b1.endpoint", B1_ENDPOINT)

## B2 CONF

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b2.access.key", B2_ACCESS_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b2.secret.key", B2_SECRET_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b2.endpoint", B2_ENDPOINT)

df1 = spark.read.parquet("s3a://" + B1_BUCKET_NAME + "/somepath")
df.show()

df2 = spark.read.parquet("s3a://" + B2_BUCKET_NAME + "/somepath")
od.show()
Run Code Online (Sandbox Code Playgroud)

错误是: Py4JJavaError: An error occurred while calling o915.parquet. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage X failed N times, most recent failure: Lost task 0.15 in stage X (TID M, IP, executor …

amazon-s3 amazon-web-services pyspark

5
推荐指数
0
解决办法
721
查看次数

TypeError:data.forEach不是一个函数

这是我的代码:

$.ajax({
    url: "some_url/",
    type: "GET",
    dataType: "json",
    success: function(data){
        console.log(data);
        data.forEach(function(element){
            console.log(element);
        });
    }
});
Run Code Online (Sandbox Code Playgroud)

我得到的错误是每个data变量都不起作用。但是,当我登录data到控制台时,

[{"model": "app.mdl", "pk": 1, "fields": {"name": "test", "rank": 1}}]

显然,这是一个数组且可迭代,所以我不明白到底是什么错误。

编辑:在Django中data通过返回JsonResponse

html javascript django jquery

4
推荐指数
1
解决办法
1万
查看次数

在 Airflow 中编写和导入自定义插件

这实际上是两个问题合二为一。

AIRFLOW_HOME的结构像

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils
Run Code Online (Sandbox Code Playgroud)

我一直在关注 astronomer.io 的示例https://github.com/airflow-plugins。我的自定义operators使用我的自定义hooks,所有导入都是相对于顶级文件夹的plugins

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试将整个存储库移动到 plugins 文件夹时,运行后出现导入错误,airflow list_dagsplugins找不到。

我阅读了一些关于它的内容,显然 Airflow 将插件加载到其核心模块中,以便它们可以像这样导入 …

airflow

4
推荐指数
1
解决办法
5638
查看次数

Prolog子句单独终止,但不一起终止

所以

?- canCall(mary, Person).
Run Code Online (Sandbox Code Playgroud)

工作并终止,

?- canFind(mary, Person).
Run Code Online (Sandbox Code Playgroud)

也可以工作并终止。但是不知何故

?- canCall(mary, Person), canFind(mary, Person). 
Run Code Online (Sandbox Code Playgroud)

不终止。可能是什么原因?

prolog terminate non-termination failure-slice

2
推荐指数
1
解决办法
99
查看次数