我已经实现了像这样的结构化流...
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
这一切似乎都有效,但看看MyWrapper.myWriter的吞吐量是可怕的.它有效地尝试成为JDBC接收器,它看起来像:
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach( s => statement.execute(s) )
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
Run Code Online (Sandbox Code Playgroud)
所以我的问题是 …
有什么办法可以使服务器保持默认状态。
dags_are_paused_at_creation =真
...但是对于一个特定的dag,默认情况下将其定义为未暂停?
dag = DAG(
dag_id=MISC_DAG_ID,
default_args=default_args,
params=params,
schedule_interval=None,
concurrency=1,
max_active_runs=1,
is_paused=False
)
Run Code Online (Sandbox Code Playgroud) 所以,我有一个有序的字典,它有一堆键/值对。我可以使用 items() 方法提取所有它们。但是如果我只想选择其中的一些呢?
>>> import collections
>>> d = collections.OrderedDict({'banana': 3, 'apple': 4, 'pear': 1,'orange': 2})
>>> d.items()
[('apple', 4), ('banana', 3), ('orange', 2), ('pear', 1)]
Run Code Online (Sandbox Code Playgroud)
如果我只想要苹果和香蕉怎么办?
有没有办法可以指定我想要的键?
>>> d['apple','banana'] <-- does not work
Run Code Online (Sandbox Code Playgroud)
我正在考虑在最后使用列表理解来过滤结果,但看起来很混乱,我希望有更好的方法。