小编Exi*_*xie的帖子

Spark Structured Streaming ForeachWriter和数据库性能

我已经实现了像这样的结构化流...

myDataSet
  .map(r =>  StatementWrapper.Transform(r))
  .writeStream
  .foreach(MyWrapper.myWriter)
  .start()
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

这一切似乎都有效,但看看MyWrapper.myWriter的吞吐量是可怕的.它有效地尝试成为JDBC接收器,它看起来像:

val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {

  var connection: Connection = _

  override def open(partitionId: Long, version: Long): Boolean = {
    Try (connection = getRemoteConnection).isSuccess
  }

  override def process(row: Seq[String]) {
    val statement = connection.createStatement()
    try {
      row.foreach( s => statement.execute(s) )
    } catch {
      case e: SQLSyntaxErrorException => println(e)
      case e: SQLException => println(e)
    } finally {
      statement.closeOnCompletion()
    }
  }

  override def close(errorOrNull: Throwable) {
    connection.close()
  }
}
Run Code Online (Sandbox Code Playgroud)

所以我的问题是 …

database scala jdbc apache-spark spark-structured-streaming

12
推荐指数
1
解决办法
3333
查看次数

定义为气流停止

有什么办法可以使服务器保持默认状态。

dags_are_paused_at_creation =真

...但是对于一个特定的dag,默认情况下将其定义为未暂停?

dag = DAG(
    dag_id=MISC_DAG_ID,
    default_args=default_args,
    params=params,
    schedule_interval=None,
    concurrency=1,
    max_active_runs=1,
    is_paused=False
)
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow airflow-scheduler

5
推荐指数
1
解决办法
388
查看次数

来自ordereddict的子集字段?

所以,我有一个有序的字典,它有一堆键/值对。我可以使用 items() 方法提取所有它们。但是如果我只想选择其中的一些呢?

>>> import collections
>>> d = collections.OrderedDict({'banana': 3, 'apple': 4, 'pear': 1,'orange': 2})
>>> d.items()
[('apple', 4), ('banana', 3), ('orange', 2), ('pear', 1)]
Run Code Online (Sandbox Code Playgroud)

如果我只想要苹果和香蕉怎么办?

有没有办法可以指定我想要的键?

>>> d['apple','banana'] <-- does not work
Run Code Online (Sandbox Code Playgroud)

我正在考虑在最后使用列表理解来过滤结果,但看起来很混乱,我希望有更好的方法。

python ordereddictionary

3
推荐指数
1
解决办法
1740
查看次数