给定两个Spark 数据集,A和BI可以在单列上进行连接,如下所示:
a.joinWith(b, $"a.col" === $"b.col", "left")
Run Code Online (Sandbox Code Playgroud)
我的问题是你是否可以使用多列进行连接.基本上相当于以下DataFrames api代码:
a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left")
Run Code Online (Sandbox Code Playgroud) 我试图了解 NATS Jetstream 如何扩展并有几个问题。
通过主题订阅历史消息的效率如何?例如,假设有一个流foo
,其中包含 1 亿条主题为 的消息foo.bar
,然后是一条主题为 的消息foo.baz
。如果我从流的开头进行订阅,foo.baz
服务器上的某些内容必须对所有消息执行线性扫描foo
,或者它能够立即查找消息 foo.baz
。
系统的水平扩展能力如何?我问这个问题是因为我在让 Jetstream 扩展到每秒几千条消息以上时遇到问题,无论我向它投入多少台机器。测试参数如下:
2.6.3
运行在 4 核 8GB 节点上n
每个发布者每秒发布 1k 条消息瓶颈似乎出现在发布方面,因为我检索消息的速度至少与发布消息的速度一样快。我正在尝试研究 Airflow 在多租户环境中的工作方式。具体来说,要求应该是这样的:
在这一点上,我不清楚 Airflow 是否可以满足要求 3),除了给每个团队一个单独的 Airflow 实例。有什么方法可以实现吗?
谢谢,
克里斯
假设我有一个这样的数据结构,其中ts是一些时间戳
case class Record(ts: Long, id: Int, value: Int)
Run Code Online (Sandbox Code Playgroud)
鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作:
def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
records.keyBy(_.id).reduceByKey{
(x, y) => if(x.ts > y.ts) x else y
}.values
}
Run Code Online (Sandbox Code Playgroud)
同样,这是我对数据集的尝试:
def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
records.groupByKey(_.id).mapGroups{
case(id, records) => {
records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试研究如何使用数据框来实现类似的东西,但无济于事 - 我意识到我可以使用以下方法进行分组:
records.groupBy($"id")
Run Code Online (Sandbox Code Playgroud)
但是这给了我一个RelationGroupedDataSet,我不清楚我需要编写什么聚合函数来实现我想要的东西 - 我看到的所有示例聚合似乎都只关注返回一个聚合而不是整行的列.
是否可以使用数据框来实现这一目标?
假设我有一个Future[Seq[Int]]
我要转换的内容Future[Seq[String]]
.目前我这样做:
val futureSeqString = futureSeqInt.map( x => x.map(_.toString()))
Run Code Online (Sandbox Code Playgroud)
这可行,但嵌套的地图似乎有点尴尬.等效转换Future[Option[Int]]
稍微好一点,但它仍然不觉得我这样做是最好的方式:
val futureOptionString = futureOptionInt.map {
case Some(x) => x.toString();
case _ => None;
}
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法来解决这个问题?
我们有多个(50+)nifi流,它们基本上都具有相同的作用:从db中提取一些数据,将一些列转换为conquet并上载到hdfs。它们仅在细节方面有所不同,例如要运行的sql查询或它们在hdfs中的位置。
问题是如何将这些常见nifi流量排除在外,以使对通用流所做的任何更改都自动应用于所有所有派生流。例如,如果我想添加一个额外的步骤以将数据也发布到Kafka,我想进行一次设置并自动将其应用于所有50个流。
我们试图使它与nifi注册表一起使用,但是似乎不完美。本质上,问题在于nifi注册表似乎可以很好地在一个环境中(例如wat)更新流,然后在另一个环境中(例如prod)自动更新流。一个特定的示例似乎不太适合在同一环境中更新多个流,每次重新部署时,它将每个流的名称重置为模板名称,这意味着所有流最终都具有相同的名称!
没有人知道应该如何处理像我们这样的情况吗?猜测这一定很普遍。