小编fjj*_*s88的帖子

从 athena aws 中单独字段中的整数创建日期

我对 athena 非常陌生,所以请耐心等待。我将数据以整数形式存储在年、月和日的三个单独的列中,如下所示:

year   month   day
2020   7       10
2020   7       11
2020   7       12
Run Code Online (Sandbox Code Playgroud)

我想将这三个字段变成一个日期。我怎么做?

提前致谢!

sql amazon-web-services amazon-athena

5
推荐指数
1
解决办法
5323
查看次数

iTerm2 向前斜杠跳转

我需要设置什么绑定才能跳转到 iTerm2 路径中的每个正斜杠?我正在使用带有 BigSur 的 Mac。

目前,我已经设置了绑定,因此它将按字跳过。在下面的示例中,如果光标位于路径末尾(“h”之后),并且我点击 option + 向左箭头,它将跳转到“s3”之后的冒号。我希望它跳转到“文件”之后和“路径”之前的“/”。

s3://my-bucket/my/file/path
Run Code Online (Sandbox Code Playgroud)

terminal binding keyboard-shortcuts iterm2

5
推荐指数
1
解决办法
732
查看次数

任务组中的 Aiflow 2 Xcom

我的任务组中有两个任务需要拉取 xcom 值来提供 job_flow_id 和 step_id。这是代码:

  with TaskGroup('execute_my_steps') as execute_my_steps: 

    config = {some dictionary}
    dependencies = {another dictionary}

    task_id = 'execute_spark_job_step'
    task_name = 'spark_job'

    add_step = EmrAddStepsOperator(
        task_id=task_id,
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        steps=create_emr_step(args=config, d=dependencies),
        aws_conn_id='aws_default',
        retries=3,
        dag=dag
    )

    wait_for_step = EmrStepSensor(
        task_id='wait_for_' + task_name + '_step',
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
        retries=3,
        dag=dag,
        mode='reschedule'
    )

    add_step >> wait_for_step
Run Code Online (Sandbox Code Playgroud)

问题是 step_id 无法正确呈现。wait_for_stepUI 呈现模板中的值显示为,'None'但是 xcom return_value 就execute_spark_job_step …

python amazon-emr airflow

4
推荐指数
1
解决办法
3978
查看次数

Airflow Branch 操作员和任务组任务 ID 无效

我有一个简单的 dag,它使用分支运算符来检查 y 是否为 False。如果是,则该 dag 应该转移到 say_goodbye 任务组。如果为 True,则跳过并转到 finish_dag_step。这是达格:

def which_step() -> str:
  y = False
  if not y:
      return 'say_goodbye'
  else:
      return 'finish_dag_step'

with DAG(
  'my_test_dag',
  start_date = datetime(2022, 5, 14),
  schedule_interval = '0 0 * * *',
  catchup = True) as dag:

say_hello = BashOperator(
    task_id = 'say_hello',
    retries = 3,
    bash_command = 'echo "hello world"'
)

run_which_step = BranchPythonOperator(
    task_id = 'run_which_step',
    python_callable = which_step,
    retries = 3,
    retry_exponential_backoff = True,
    retry_delay = timedelta(seconds = …
Run Code Online (Sandbox Code Playgroud)

python airflow

4
推荐指数
1
解决办法
2096
查看次数