我尝试执行以下操作 - 为每个存储桶设置特定配置 - 从一个 S3 存储桶读取数据并写入另一个存储桶。这两个存储桶具有不同的凭据并且属于不同的帐户。但是,读取仅适用于第一个存储桶。也就是说,如果我们首先访问第一个存储桶,则第二个存储桶将永远不可用(反之亦然)。
import os
## B1 CONF
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b1.access.key", B1_ACCESS_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b1.secret.key", B1_SECRET_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b1.endpoint", B1_ENDPOINT)
## B2 CONF
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b2.access.key", B2_ACCESS_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b2.secret.key", B2_SECRET_KEY)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.bucket.b2.endpoint", B2_ENDPOINT)
df1 = spark.read.parquet("s3a://" + B1_BUCKET_NAME + "/somepath")
df.show()
df2 = spark.read.parquet("s3a://" + B2_BUCKET_NAME + "/somepath")
od.show()
Run Code Online (Sandbox Code Playgroud)
错误是:
Py4JJavaError: An error occurred while calling o915.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage X failed N times, most recent failure: Lost task 0.15 in stage X (TID M, IP, executor …
这是我的代码:
$.ajax({
url: "some_url/",
type: "GET",
dataType: "json",
success: function(data){
console.log(data);
data.forEach(function(element){
console.log(element);
});
}
});
Run Code Online (Sandbox Code Playgroud)
我得到的错误是每个data变量都不起作用。但是,当我登录data到控制台时,
[{"model": "app.mdl", "pk": 1, "fields": {"name": "test", "rank": 1}}]
显然,这是一个数组且可迭代,所以我不明白到底是什么错误。
编辑:在Django中data通过返回JsonResponse。
这实际上是两个问题合二为一。
我AIRFLOW_HOME的结构像
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
Run Code Online (Sandbox Code Playgroud)
我一直在关注 astronomer.io 的示例https://github.com/airflow-plugins。我的自定义operators使用我的自定义hooks,所有导入都是相对于顶级文件夹的plugins。
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试将整个存储库移动到 plugins 文件夹时,运行后出现导入错误,airflow list_dags说plugins找不到。
我阅读了一些关于它的内容,显然 Airflow 将插件加载到其核心模块中,以便它们可以像这样导入 …
所以
?- canCall(mary, Person).
Run Code Online (Sandbox Code Playgroud)
工作并终止,
?- canFind(mary, Person).
Run Code Online (Sandbox Code Playgroud)
也可以工作并终止。但是不知何故
?- canCall(mary, Person), canFind(mary, Person).
Run Code Online (Sandbox Code Playgroud)
不终止。可能是什么原因?