如何使用 Scala Operator 在 Airflow 中运行 Scala 代码

Zvi*_*nts 2 scala airflow

我刚刚为 Aerospike 编写了一个恢复过程,它看起来非常适合 Airflow,我正在为 Scala 寻找一些 Airflow Operator。

目前的实施:

 // Register UDF for LUT
  aerospikeService.registerUDFs(
    """
      |function getLUT(r)
      |    return record.last_update_time(r)
      |end
      |""".stripMargin
  )

  // Pause Connectors
  k8sService.pauseConnectors()

  // Get Connectors, Current Offsets and LUTs
  val connectors = k8sService.getConnectors()
  val originalState = kafkaService.getCurrentState()
  val startTime = aerospikeService.calculateCurrentLUTs()

  // Delete Connectors
  k8sService.deleteConnectors()
  kafkaService.resetOffsets(originalState)

  // Recreate Connectors
  k8sService.createConnectors(connectors)

  // Wait until Offset Reached
  kafkaService.waitTillOriginalOffsetsReached(originalState)

  // Truncate
  aerospikeService.truncate(startTime, durableDelete)

  // Cleanup
  aerospikeService.cleanup()
Run Code Online (Sandbox Code Playgroud)

Bas*_*lak 7

Airflow 中没有“ScalaOperator”来运行 Scala 代码。Python 不是 JVM 语言,因此您需要构建一个 jar 文件,该文件可以从另一个进程执行。例如,在 Airflow 中使用 BashOperator:

scala_task = BashOperator(
    task_id="scala_task",
    dag=dag,
    bash_command="java -jar myjar.jar",
)
Run Code Online (Sandbox Code Playgroud)

另一种流行的解决方案是将代码构建到 Docker 容器中,并使用 KubernetesPodOperator 在 Kubernetes 集群上启动该容器。

请注意,BashOperator (1) 要求 JVM 存在于 Airflow 工作节点上,并且 (2) 如果使用 BashOperator 触发,该进程将在工作节点上运行,因此请确保有足够的资源来处理该问题。如果没有,请将繁重的处理“外包”到其他地方,例如 K8S 或 Spark 集群。