我刚刚为 Aerospike 编写了一个恢复过程,它看起来非常适合 Airflow,我正在为 Scala 寻找一些 Airflow Operator。
目前的实施:
// Register UDF for LUT
aerospikeService.registerUDFs(
"""
|function getLUT(r)
| return record.last_update_time(r)
|end
|""".stripMargin
)
// Pause Connectors
k8sService.pauseConnectors()
// Get Connectors, Current Offsets and LUTs
val connectors = k8sService.getConnectors()
val originalState = kafkaService.getCurrentState()
val startTime = aerospikeService.calculateCurrentLUTs()
// Delete Connectors
k8sService.deleteConnectors()
kafkaService.resetOffsets(originalState)
// Recreate Connectors
k8sService.createConnectors(connectors)
// Wait until Offset Reached
kafkaService.waitTillOriginalOffsetsReached(originalState)
// Truncate
aerospikeService.truncate(startTime, durableDelete)
// Cleanup
aerospikeService.cleanup()
Run Code Online (Sandbox Code Playgroud)
Airflow 中没有“ScalaOperator”来运行 Scala 代码。Python 不是 JVM 语言,因此您需要构建一个 jar 文件,该文件可以从另一个进程执行。例如,在 Airflow 中使用 BashOperator:
scala_task = BashOperator(
task_id="scala_task",
dag=dag,
bash_command="java -jar myjar.jar",
)
Run Code Online (Sandbox Code Playgroud)
另一种流行的解决方案是将代码构建到 Docker 容器中,并使用 KubernetesPodOperator 在 Kubernetes 集群上启动该容器。
请注意,BashOperator (1) 要求 JVM 存在于 Airflow 工作节点上,并且 (2) 如果使用 BashOperator 触发,该进程将在工作节点上运行,因此请确保有足够的资源来处理该问题。如果没有,请将繁重的处理“外包”到其他地方,例如 K8S 或 Spark 集群。
| 归档时间: |
|
| 查看次数: |
3576 次 |
| 最近记录: |