默认情况下,Python 3对源代码文件使用UTF-8编码.我还应该在每个源文件的开头使用编码声明吗?喜欢# -*- coding: utf-8 -*-
我有两个版本的Python.当我使用spark-submit启动spark应用程序时,应用程序使用默认版本的Python.但是,我想使用另一个.如何指定要使用的spark-submit的Python版本?
我试图在 DAG 内的父任务 5 分钟后执行任务。
DAG:任务 1 ----> 等待 5 分钟 ----> 任务 2
如何在 Apache Airflow 中实现这一点?提前致谢。
我想使用 PyArrow 将以下 Pandas 数据框存储在镶木地板文件中:
import pandas as pd
df = pd.DataFrame({'field': [[{}, {}]]})
Run Code Online (Sandbox Code Playgroud)
field
列的类型是字典列表:
field
0 [{}, {}]
Run Code Online (Sandbox Code Playgroud)
我首先定义相应的 PyArrow 架构:
import pyarrow as pa
schema = pa.schema([pa.field('field', pa.list_(pa.struct([])))])
Run Code Online (Sandbox Code Playgroud)
然后我使用from_pandas()
:
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
Run Code Online (Sandbox Code Playgroud)
这将引发以下异常:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "table.pxi", line 930, in pyarrow.lib.Table.from_pandas
File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 371, in dataframe_to_arrays
convert_types)]
File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 370, in <listcomp>
for c, t in zip(columns_to_convert,
File "/anaconda3/lib/python3.6/site-packages/pyarrow/pandas_compat.py", line 366, in …
Run Code Online (Sandbox Code Playgroud) 重试任务可能毫无意义。例如,如果任务是传感器,并且由于凭据无效而失败,那么以后的任何重试都将不可避免地失败。如何定义可以决定重试是否合理的操作员?
在 Airflow 1.10.6 中,决定是否应重试任务的逻辑位于 中airflow.models.taskinstance.TaskInstance.handle_failure
,因此无法在操作员中定义行为,因为这是任务的责任,而不是操作员的责任。
理想的情况是该handle_failure
方法是在 Operator 端定义的,这样我们就可以根据需要重新定义它。
我发现的唯一解决方法是使用PythonBranchingOperator
“测试”任务是否可以运行。例如,在上述传感器的情况下,检查登录凭据是否有效,然后才将 DAG 流传送到传感器。否则,失败(或分支到另一个任务)。
我的分析正确吗handle_failure
?有更好的解决方法吗?
我遵循 Argo 工作流程的入门文档。一切都很顺利,直到我按照4. 运行示例工作流程中所述运行第一个示例工作流程。工作流程陷入挂起状态:
vagrant@master:~$ argo submit --watch https://raw.githubusercontent.com/argoproj/argo/master/examples/hello-world.yaml
Name: hello-world-z4lbs
Namespace: default
ServiceAccount: default
Status: Pending
Created: Thu May 14 12:36:45 +0000 (now)
vagrant@master:~$ argo list
NAME STATUS AGE DURATION PRIORITY
hello-world-z4lbs Pending 27m 0s 0
Run Code Online (Sandbox Code Playgroud)
这里提到聚集节点上的污点可能是问题所在,所以我清除了主节点:
vagrant@master:~$ kubectl taint nodes --all node-role.kubernetes.io/master-
node/master untainted
taint "node-role.kubernetes.io/master" not found
taint "node-role.kubernetes.io/master" not found
Run Code Online (Sandbox Code Playgroud)
然后我删除了挂起的工作流程并重新提交,但它再次陷入挂起状态。
新提交的工作流的详细信息也被卡住了:
vagrant@master:~$ kubectl describe workflow hello-world-8kvmb
Name: hello-world-8kvmb
Namespace: default
Labels: <none>
Annotations: <none>
API Version: argoproj.io/v1alpha1
Kind: Workflow
Metadata: …
Run Code Online (Sandbox Code Playgroud) I am looking to write docker compose file to locally execute airflow in production similar environent.
For older airflow v1.10.14, docker compose is working fine. But same docker compose is not working for latest stable version, airflow scheduler & webservice is failing continuously. error message looks like unable to create audit tables.
docker-compose.yaml
:
version: "2.1"
services:
postgres:
image: postgres:12
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5433:5432"
scheduler:
image: apache/airflow:1.10.14
restart: always
depends_on:
- postgres
- webserver …
Run Code Online (Sandbox Code Playgroud) 对于一个项目,我需要开发一个 ETL 流程(提取转换加载),该流程从(传统)工具读取数据,该工具在 REST API 上公开其数据。该数据需要存储在亚马逊S3中。
我真的很喜欢用 apache nifi 尝试这个,但老实说我还不知道如何连接 REST API,以及在哪里/如何实现一些业务逻辑来与源系统“谈论正确的协议”。例如,我喜欢跟踪到目前为止已写入的数据,以便它可以从原来的位置恢复加载。
到目前为止,我一直在阅读 nifi 文档,并且我更好地了解了该工具提供/需要什么。然而,目前尚不清楚我如何在 nifi 架构中实现该任务。
希望有人能给我一些指导吗?
谢谢,保罗
要获取整数位置的标量0
和列标签'A'
的数据帧df
,我做链接的索引:df.iloc[0]['A']
。这是有效的,但 Pandas 文档说应该避免链接索引。
我能想到的另一种方法是df.iat[0, df.columns.get_loc('A')]
,与链式索引相比,它的类型太多了。有没有更短的方法来做到这一点?
注意:不推荐使用 .ix 索引器。
例子:
df=pd.DataFrame({'A':[10,20,30,40]}, index=[3,2,1,0])
一种 ------ 3 10 2 20 1 30 0 40
0
列中整数位置的标量A
是10
而不是40
:
df.iat[0, df.columns.get_loc('A')]
Otuput:10
我在 Jupyter 笔记本中编写了一个狗分类器,每次在图像中检测到狗时,它都应该显示该图像并打印一些描述该图像的文本。不知何故,无论我按什么顺序放置plt.imshow()
和,图像总是在打印所有文本后显示print()
。有谁知道为什么会这样?
谢谢你!
这是我的代码片段:
for i in range (0, 1,1):
all_counter+=1
if dog_detector(dog_files_short[i]):
img = image.load_img(dog_files_short[i], target_size=(224, 224))
plt.show()
plt.imshow(img)
time.sleep(5)
print("That's a dog!!!!")
dog_counter+=1
print("______________")
else:
print("______________")
img = image.load_img(dog_files_short[i], target_size=(224, 224))
plt.show()
plt.imshow(img)
print("No Doggo up here :(")
print(ResNet50_predict_labels(dog_files_short[i]))
print("______________")
print((dog_counter/all_counter)*100, "% of the dog pictures are classified as dogs")
Run Code Online (Sandbox Code Playgroud)
输出是这样的:
我试图使用 Spark 数据源 API 从 Oracle 数据库加载数据。
由于我需要通过查询加载数据,因此我使用了下面的查询,该查询是我从一些在线示例中整理出来的:
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("user", MYSQL_USERNAME);
options.put("password", MYSQL_PWD);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "(select emp_no, emp_id from employees) as employees_data");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
Run Code Online (Sandbox Code Playgroud)
这会出现异常:
线程“main”中出现异常 java.sql.SQLSyntaxErrorException: ORA-00933: SQL 命令未正确结束
我怀疑我们不能为 Oracle 查询提供“asEmployees_data”,那么我做错了什么?
python ×7
airflow ×3
apache-spark ×2
pandas ×2
python-3.x ×2
airflow-2.x ×1
apache ×1
apache-nifi ×1
argoproj ×1
bigdata ×1
encoding ×1
hadoop ×1
imshow ×1
matplotlib ×1
oracle ×1
parquet ×1
pyarrow ×1
rest ×1
scala ×1
sequence ×1
utf-8 ×1