小编d80*_*tb7的帖子

如何在多列上连接数据集?

给定两个Spark 数据集,A和BI可以在单列上进行连接,如下所示:

a.joinWith(b, $"a.col" === $"b.col", "left")
Run Code Online (Sandbox Code Playgroud)

我的问题是你是否可以使用多列进行连接.基本上相当于以下DataFrames api代码:

a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left")
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

13
推荐指数
2
解决办法
2万
查看次数

NATS Jetstream 的性能

我试图了解 NATS Jetstream 如何扩展并有几个问题。

  1. 通过主题订阅历史消息的效率如何?例如,假设有一个流foo,其中包含 1 亿条主题为 的消息foo.bar,然后是一条主题为 的消息foo.baz。如果我从流的开头进行订阅,foo.baz服务器上的某些内容必须对所有消息执行线性扫描foo,或者它能够立即查找消息 foo.baz

  2. 系统的水平扩展能力如何?我问这个问题是因为我在让 Jetstream 扩展到每秒几千条消息以上时遇到问题,无论我向它投入多少台机器。测试参数如下:

    • Nats Server2.6.3运行在 4 核 8GB 节点上
    • 单个流复制 3 次(磁盘或内存中似乎没有区别)
    • 500 字节消息有效负载
    • n每个发布者每秒发布 1k 条消息瓶颈似乎出现在发布方面,因为我检索消息的速度至少与发布消息的速度一样快。

go nats.io nats-jetstream

12
推荐指数
1
解决办法
5463
查看次数

Apache Airflow 多租户

我正在尝试研究 Airflow 在多租户环境中的工作方式。具体来说,要求应该是这样的:

  1. TeamA 和 TeamB 两个团队正在使用单个 Airflow 实例。
  2. 团队的 A 和 B 都有自己的服务用户帐户:serviceUserA 和 ServiceUserB,他们应该在它们下运行他们的作业。
  3. 出于安全原因,团队 A 不应创建在 ServiceUserB 下运行的作业,反之亦然。

在这一点上,我不清楚 Airflow 是否可以满足要求 3),除了给每个团队一个单独的 Airflow 实例。有什么方法可以实现吗?

谢谢,

克里斯

airflow apache-airflow airflow-scheduler

6
推荐指数
1
解决办法
2713
查看次数

Spark Dataframes-按键减少

假设我有一个这样的数据结构,其中ts是一些时间戳

case class Record(ts: Long, id: Int, value: Int)
Run Code Online (Sandbox Code Playgroud)

鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}
Run Code Online (Sandbox Code Playgroud)

同样,这是我对数据集的尝试:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试研究如何使用数据框来实现类似的东西,但无济于事 - 我意识到我可以使用以下方法进行分组:

records.groupBy($"id")
Run Code Online (Sandbox Code Playgroud)

但是这给了我一个RelationGroupedDataSet,我不清楚我需要编写什么聚合函数来实现我想要的东西 - 我看到的所有示例聚合似乎都只关注返回一个聚合而不是整行的列.

是否可以使用数据框来实现这一目标?

scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
1万
查看次数

映射Scala期货

假设我有一个Future[Seq[Int]]我要转换的内容Future[Seq[String]].目前我这样做:

 val futureSeqString = futureSeqInt.map( x => x.map(_.toString()))
Run Code Online (Sandbox Code Playgroud)

这可行,但嵌套的地图似乎有点尴尬.等效转换Future[Option[Int]]稍微好一点,但它仍然不觉得我这样做是最好的方式:

val futureOptionString = futureOptionInt.map {       
  case Some(x) => x.toString(); 
  case _ => None;
}
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来解决这个问题?

scala

2
推荐指数
2
解决办法
558
查看次数

Nifi的多重流

我们有多个(50+)nifi流,它们基本上都具有相同的作用:从db中提取一些数据,将一些列转换为conquet并上载到hdfs。它们仅在细节方面有所不同,例如要运行的sql查询或它们在hdfs中的位置。

问题是如何将这些常见nifi流量排除在外,以使对通用流所做的任何更改都自动应用于所有所有派生流。例如,如果我想添加一个额外的步骤以将数据也发布到Kafka,我想进行一次设置并自动将其应用于所有50个流。

我们试图使它与nifi注册表一起使用,但是似乎不完美。本质上,问题在于nifi注册表似乎可以很好地在一个环境中(例如wat)更新流,然后在另一个环境中(例如prod)自动更新流。一个特定的示例似乎不太适合在同一环境中更新多个流,每次重新部署时,它将每个流的名称重置为模板名称,这意味着所有流最终都具有相同的名称!

没有人知道应该如何处理像我们这样的情况吗?猜测这一定很普遍。

apache-nifi

1
推荐指数
1
解决办法
937
查看次数