小编big*_*add的帖子

另存为分区镶木地板时是否可以更改输出文件名?

当我们将 pandas 数据框保存为分区镶木地板时,文件名会自动生成。

是否可以指定每个分区的输出文件名?

使用示例

df = pd.DataFrame(data={'year':  [2020, 2020, 2021],
                        'month': [1,12,2], 
                        'day':   [1,31,28], 
                        'value': [1000,2000,3000]})

df.to_parquet('./output', partition_cols=['year', 'month'])


output/year=2020/month=1/6f0258e6c48a48dbb56cae0494adf659.parquet
output/year=2020/month=12/cf8a45116d8441668c3a397b816cd5f3.parquet
output/year=2021/month=2/7f9ba3f37cb9417a8689290d3f5f9e6e.parquet
Run Code Online (Sandbox Code Playgroud)

是否可以得到

output/year=2020/month=1/2020_01.parquet
output/year=2020/month=12/2020_12.parquet
output/year=2021/month=2/2021_02.parquet
Run Code Online (Sandbox Code Playgroud)

谢谢你的时间

python dataframe pandas parquet

5
推荐指数
1
解决办法
696
查看次数

Airflow - 根据条件停止 DAG(跳过分支后的剩余任务)

我是气流方面的新手,所以我在这里有疑问。

如果满足第一个任务的条件,我想运行 DAG。如果条件不满足,我想在第一个任务之后停止该任务。

例子:

# first task
def get_number_func(**kwargs):

    number = randint(0, 10)
    print(number)
    
    if (number >= 5):
        print('A')
        return 'continue_task'
    else:
        #STOP DAG
        
# second task if number is higher or equal 5
def continue_func(**kwargs):
    print("The number is " + str(number))
    
# first task declaration
start_op = BranchPythonOperator(
    task_id='get_number',
    provide_context=True,
    python_callable=get_number_func,
    op_kwargs={},
    dag=DAG,
)

# second task declaration
continue_op = PythonOperator(
    task_id='continue_task',
    provide_context=True,
    python_callable=continue_func,
    op_kwargs={},
    dag=DAG,
)

start_op  >> continue_op 

Run Code Online (Sandbox Code Playgroud)

如果满足数量条件,我只会运行第二个任务。如果条件未得到验证,DAG 不应运行第二个任务。

我怎样才能做到这一点?我不想使用 xcom、全局变量或虚拟任务。

提前致谢!

python bigdata airflow

4
推荐指数
1
解决办法
7760
查看次数

如何考虑同一 DAG 中先前任务的结果来创建动态数量的任务?

我想创建动态数量的任务,同时考虑到在上一个任务中读取 CSV 时获得的块数量。除此之外,我想同时运行所有动态任务,并在完成后运行最终任务。所有这些都在同一个 DAG 内。

为了使“图像”更清晰,我创建了一个图表:

和代码:

#python imports
import awswrangler as wr
import boto3
from datetime import datetime
#airflow imports
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

# First PythonOperator
def read_csv(s3_dir,client_s3,**kwargs):
    
    df_list = wr.s3.read_csv(s3_dir, sep='|', chunksize = 100,boto3_session = client_s3)
    
    intermediate_dir = Variable.get("s3_intermediate_dir")
    
    for idx, df in enumerate(df_list):
        wr.s3.to_csv(df,intermediate_dir + "intermediate_{}".format(idx))
    
    ti.xcom_push(key='number_of_chunks', value=len(df_list))

# * PythonOperator
def transform_dataframes(s3_dir,client_s3,**kwargs):
    #DO SOMETHING
    
# Final PythonOperator
def agg_df_one(s3_dir,client_s3,**kwargs):
    #list all files
    all_files = wr.s3.list_objects(s3_dir, boto3_session = …
Run Code Online (Sandbox Code Playgroud)

python dataframe airflow

2
推荐指数
1
解决办法
803
查看次数

SQL - 带有 ID 的多行,但我想将所有信息加入一列

我有表格示例:

ID 行A 行B 行C
1 VA1 空值 空值
2 VB1 空值 空值
1 空值 VA2 空值
2 空值 VB2 空值
1 空值 空值 VA3
2 空值 空值 VB3

我想得到这个结果:

ID 行A 行B 行C
1 VA1 VA2 VA3
2 VB1 VB2 VB3

我试过

SELECT DISTINCT
    a.id,
    b.RowA,
    c.RowB,
    d.RowC,
FROM
    (SELECT DISTINCT ID
     FROM Example) a
LEFT JOIN 
    (SELECT DISTINCT 
         id,
         RowA
     FROM Example
     WHERE RowA IS NOT NULL) b ON a.id = b.id
LEFT JOIN 
    (SELECT DISTINCT
         id,
         RowB, …
Run Code Online (Sandbox Code Playgroud)

sql

-1
推荐指数
1
解决办法
24
查看次数

标签 统计

python ×3

airflow ×2

dataframe ×2

bigdata ×1

pandas ×1

parquet ×1

sql ×1