当我们将 pandas 数据框保存为分区镶木地板时,文件名会自动生成。
是否可以指定每个分区的输出文件名?
使用示例
df = pd.DataFrame(data={'year': [2020, 2020, 2021],
'month': [1,12,2],
'day': [1,31,28],
'value': [1000,2000,3000]})
df.to_parquet('./output', partition_cols=['year', 'month'])
output/year=2020/month=1/6f0258e6c48a48dbb56cae0494adf659.parquet
output/year=2020/month=12/cf8a45116d8441668c3a397b816cd5f3.parquet
output/year=2021/month=2/7f9ba3f37cb9417a8689290d3f5f9e6e.parquet
Run Code Online (Sandbox Code Playgroud)
是否可以得到
output/year=2020/month=1/2020_01.parquet
output/year=2020/month=12/2020_12.parquet
output/year=2021/month=2/2021_02.parquet
Run Code Online (Sandbox Code Playgroud)
谢谢你的时间
我是气流方面的新手,所以我在这里有疑问。
如果满足第一个任务的条件,我想运行 DAG。如果条件不满足,我想在第一个任务之后停止该任务。
例子:
# first task
def get_number_func(**kwargs):
number = randint(0, 10)
print(number)
if (number >= 5):
print('A')
return 'continue_task'
else:
#STOP DAG
# second task if number is higher or equal 5
def continue_func(**kwargs):
print("The number is " + str(number))
# first task declaration
start_op = BranchPythonOperator(
task_id='get_number',
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
dag=DAG,
)
# second task declaration
continue_op = PythonOperator(
task_id='continue_task',
provide_context=True,
python_callable=continue_func,
op_kwargs={},
dag=DAG,
)
start_op >> continue_op
Run Code Online (Sandbox Code Playgroud)
如果满足数量条件,我只会运行第二个任务。如果条件未得到验证,DAG 不应运行第二个任务。
我怎样才能做到这一点?我不想使用 xcom、全局变量或虚拟任务。
提前致谢!
我想创建动态数量的任务,同时考虑到在上一个任务中读取 CSV 时获得的块数量。除此之外,我想同时运行所有动态任务,并在完成后运行最终任务。所有这些都在同一个 DAG 内。
为了使“图像”更清晰,我创建了一个图表:
和代码:
#python imports
import awswrangler as wr
import boto3
from datetime import datetime
#airflow imports
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
# First PythonOperator
def read_csv(s3_dir,client_s3,**kwargs):
df_list = wr.s3.read_csv(s3_dir, sep='|', chunksize = 100,boto3_session = client_s3)
intermediate_dir = Variable.get("s3_intermediate_dir")
for idx, df in enumerate(df_list):
wr.s3.to_csv(df,intermediate_dir + "intermediate_{}".format(idx))
ti.xcom_push(key='number_of_chunks', value=len(df_list))
# * PythonOperator
def transform_dataframes(s3_dir,client_s3,**kwargs):
#DO SOMETHING
# Final PythonOperator
def agg_df_one(s3_dir,client_s3,**kwargs):
#list all files
all_files = wr.s3.list_objects(s3_dir, boto3_session = …Run Code Online (Sandbox Code Playgroud) 我有表格示例:
| ID | 行A | 行B | 行C |
|---|---|---|---|
| 1 | VA1 | 空值 | 空值 |
| 2 | VB1 | 空值 | 空值 |
| 1 | 空值 | VA2 | 空值 |
| 2 | 空值 | VB2 | 空值 |
| 1 | 空值 | 空值 | VA3 |
| 2 | 空值 | 空值 | VB3 |
我想得到这个结果:
| ID | 行A | 行B | 行C |
|---|---|---|---|
| 1 | VA1 | VA2 | VA3 |
| 2 | VB1 | VB2 | VB3 |
我试过
SELECT DISTINCT
a.id,
b.RowA,
c.RowB,
d.RowC,
FROM
(SELECT DISTINCT ID
FROM Example) a
LEFT JOIN
(SELECT DISTINCT
id,
RowA
FROM Example
WHERE RowA IS NOT NULL) b ON a.id = b.id
LEFT JOIN
(SELECT DISTINCT
id,
RowB, …Run Code Online (Sandbox Code Playgroud)