我已经阅读了Akka流物化概念,并了解流物化是:
获取流描述(图表)并分配运行所需的所有必要资源的过程.
我按照一个示例使用mapMaterializedValue构建我的akka流以将消息发送到队列.代码的目的是在流蓝图构建并且代码正常工作之后将消息推送到队列但我真的不明白mapMaterrializaedValue在代码中做了什么:
Promise<SourceQueueWithComplete<String>> promise = new Promise.DefaultPromise<>();
Source<String, SourceQueueWithComplete<String>> s = Source
.queue(100, OverflowStrategy.fail())
.mapMaterializaedValue(queue -> {
promise.trySuccess(queue);
});
source.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left()).run(materIalizer);
promise.<SourceQueueWithComplete<String>>future().map(mapMapperFunction(), actorSystem.dispatcher());
Run Code Online (Sandbox Code Playgroud) 我有两个 SELECT 语句想要求和。两个查询都工作正常,但我无法对总计的输出进行求和。我尝试遵循这个问题,但无法通过将查询包装在中来求和select id, sum(amount) from ( )
SELECT "patient_profiles"."id", count(distinct recommendations.id) AS total
FROM "patient_profiles"
LEFT OUTER JOIN
"recommendations" ON "recommendations"."patient_profile_id" = "patient_profiles"."id"
GROUP BY "patient_profiles"."id"
UNION
SELECT "patient_profiles"."id", count(distinct patient_profile_potential_doctors.id) AS total
FROM "patient_profiles"
LEFT OUTER JOIN "patient_profile_potential_doctors" ON "patient_profile_potential_doctors"."patient_profile_id" = "patient_profiles"."id"
GROUP BY "patient_profiles"."id"
Run Code Online (Sandbox Code Playgroud) 我正在使用带有elasticsearch的apache spark 1.5数据帧,我尝试从包含id列表(数组)的列中过滤id.
例如,elasticsearch列的映射如下所示:
{
"people":{
"properties":{
"artist":{
"properties":{
"id":{
"index":"not_analyzed",
"type":"string"
},
"name":{
"type":"string",
"index":"not_analyzed",
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
示例数据格式如下所示
{
"people": {
"artist": {
[
{
"id": "153",
"name": "Tom"
},
{
"id": "15389",
"name": "Cok"
}
]
}
}
},
{
"people": {
"artist": {
[
{
"id": "369",
"name": "Carl"
},
{
"id": "15389",
"name": "Cok"
},
{
"id": "698",
"name": "Sol"
}
]
}
}
}
Run Code Online (Sandbox Code Playgroud)
在火花中我试试这个:
val peopleId = 152
val dataFrame = …Run Code Online (Sandbox Code Playgroud) 问题比听起来更简单(我认为).我有一个名为的数据库表table,我正在尝试在名为的列中插入数据first.这是代码的一部分:
my $stmt = $dbh->prepare($sql);
@array=("first","second","third");
$sql = "INSERT INTO table($array[0]) VALUES(?)";
$stmt->execute($some_value);
Run Code Online (Sandbox Code Playgroud)
该程序在没有警告的情况下运行,但它只是用0填充表,这是不同的$some_value.
解决“ Scala中的功能编程”中发现的问题时:
实现isSorted,它检查Array [A]是否根据给定的比较函数进行排序:
def isSorted[A](as: Array[A], ordered: (A,A) => Boolean): Boolean
Run Code Online (Sandbox Code Playgroud)
在将我的答案与作者提供的以下解决方案进行比较时:
// Exercise 2: Implement a polymorphic function to check whether
// an `Array[A]` is sorted
def isSorted[A](as: Array[A], gt: (A,A) => Boolean): Boolean = {
@annotation.tailrec
def go(n: Int): Boolean =
if (n >= as.length-1) true
else if (gt(as(n), as(n+1))) false
else go(n+1)
go(0)
}
Run Code Online (Sandbox Code Playgroud)
我对以下代码行感到困惑: else if (gt(as(n), as(n+1))) false
我以为在Scala中使用函数作为参数时,实际上需要在当前函数范围之外定义一个单独的函数(即,定义了另一个函数gt需要执行的操作)?我gt在其他任何地方都看不到定义,那么它如何提供可用于的Boolean值isSorted?
我的假设是正确的,还是使用函数作为参数,我在这里完全缺少什么吗?非常感谢详细的解释。
scala ×2
sql ×2
akka-stream ×1
apache-spark ×1
arrays ×1
dataframe ×1
dbd ×1
dbi ×1
java ×1
join ×1
left-join ×1
perl ×1
postgresql ×1
promise ×1