ton*_*ian 10 mysql slick reactive-streams akka-stream slick-3.0
在Slick的文档中,使用Reactive Streams的示例仅用于读取数据作为DatabasePublisher的一种方式.但是,如果您希望根据插入率将数据库用作接收器和后端,会发生什么?
我找了等效的DatabaseSubscriber,但它不存在.所以问题是,如果我有一个来源,说:
val source = Source(0 to 100)
如何用Slick创建一个Sink,将这些值写入带有模式的表中:
create table NumberTable (value INT)
串行插入
最简单的方法是在a中插入Sink.foreach.
假设您已使用模式代码生成并进一步假设您的表名为"NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
Run Code Online (Sandbox Code Playgroud)
我们可以编写一个插入函数
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
Run Code Online (Sandbox Code Playgroud)
并且该功能可以放在接收器中
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
Run Code Online (Sandbox Code Playgroud)
批量插入
您可以通过一次批量处理N个插入来进一步扩展Sink方法:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
Run Code Online (Sandbox Code Playgroud)
这个批量接收器可以通过Flow批量分组进行馈送:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)
Run Code Online (Sandbox Code Playgroud)
尽管您可以使用a Sink.foreach实现此目标(如Ramon所述),但使用a 会更安全且可能更快(通过并行运行插入)mapAsync Flow。使用时将面临的问题Sink.foreach是它没有返回值。通过slicks db.run方法插入数据库将返回a Future,然后它将退出返回的流Future[Done],并在Sink.foreach完成后立即完成。
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
def value = column[Int]("value")
def * = value
}
val numbers = TableQuery[Numbers]
val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)
val streamFuture: Future[Done] = Source(0 to 100)
.runWith(Sink.foreach[Int] { (i: Int) =>
db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
})
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")
//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done
Run Code Online (Sandbox Code Playgroud)
另一方面,它def mapAsync[T](parallelism: Int)(f: Out ? Future[T]) Flow允许您通过并行参数并行运行插入,并接受从上游输出值到某个类型的将来的函数。这符合我们的i => db.run(numbers += i)功能。这样做的好处Flow是,它随后将这些结果反馈给Futures下游。
val streamFuture2: Future[Done] = Source(0 to 100)
.mapAsync(1) { (i: Int) =>
db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
}
.runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")
//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done <-- stream Future[Done] returned after inserts finished
Run Code Online (Sandbox Code Playgroud)
为了证明这一点,您甚至可以从流中返回真实结果,而不是返回Future[Done](以Done表示Unit)。此流还将添加更高的并行度值和批处理,以提高性能。*
val streamFuture3: Future[Int] = Source(0 to 100)
.via(Flow[Int].grouped(10)) // Batch in size 10
.mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
.runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")
// sample 3 output:
// stream 3 done, inserted 101 rows
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1712 次 |
| 最近记录: |