小编Aka*_*thi的帖子

Apache Spark Dataframe Groupby agg()用于多列

我有DataFrame3列,即Id, First Name, Last Name

我想申请GroupBy的基础上,Id并希望收集First Name, Last Name列作为列表.

示例: - 我有这样的DF

+---+-------+--------+
|id |fName  |lName   |
+---+-------+--------+
|1  |Akash  |Sethi   |
|2  |Kunal  |Kapoor  |
|3  |Rishabh|Verma   |
|2  |Sonu   |Mehrotra|
+---+-------+--------+
Run Code Online (Sandbox Code Playgroud)

我希望我的输出像这样

+---+-------+--------+--------------------+
|id |fname           |lName               |
+---+-------+--------+--------------------+
|1  |[Akash]         |[Sethi]             |
|2  |[Kunal, Sonu]   |[Kapoor, Mehrotra]  |
|3  |[Rishabh]       |[Verma]             |
+---+-------+--------+--------------------+
Run Code Online (Sandbox Code Playgroud)

提前致谢

scala apache-spark spark-dataframe

12
推荐指数
1
解决办法
7555
查看次数

通过Spark从Cassandra表读取和写入时获取异常

我为Spark设置了这些配置,但每次我正在阅读或写入Cassandra表我都会得到 ioException

          .setMaster(sparkIp)
          .set("spark.cassandra.connection.host", cassandraIp)
          .set("spark.sql.crossJoin.enabled", "true")
          .set("spark.executor.memory", sparkExecutorMemory) //**26 GB**
          .set("spark.executor.cores", sparkExecutorCore) // **from 4 to 8**
          .set("spark.executor.instances", sparkExecutorInstances) // 1
          .set("spark.cassandra.output.batch.size.bytes", "2048")
          .set("spark.sql.broadcastTimeout", "2000")
          .set("spark.sql.shuffle.partitions", "1000")
          .set("spark.network.timeout", "80s")
          .set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+UseG1GC")
Run Code Online (Sandbox Code Playgroud)

sc.cassandraTableMyCaseClass //阅读代码

dataRDD..saveToCassandra("myDatabase","mytable")//编写代码

表中的数据量很大,操作也很复杂.

我正在使用带有28gb内存和8个内核的火花大师和10个具有相同配置的火花工作者,其中我使用的是26 gb内存和4到8个内核.有时我也会得到ExecutorLostException.

在Cassandra表中写入数据时的最新StackTrace

org.apache.spark.SparkException: Job aborted due to stage failure: Task 145 in stage 6.0 failed 4 times, most recent failure: Lost task 145.6 in stage 6.0 (TID 3268, 10.178.149.48): ExecutorLostFailure (executor 157 exited caused by one of the running tasks) …
Run Code Online (Sandbox Code Playgroud)

cassandra cassandra-2.0 apache-spark spark-cassandra-connector

6
推荐指数
0
解决办法
138
查看次数

每次我使用spark-submit运行作业时,Spark总是使用现有的SparkContext

我在群集上部署了一些火花.我使用spark-submit命令跟随我的项目jar 提交spark作业.

Spak Conf我的项目中有很多.Conf将根据我正在运行的课程来决定,但每次我运行spark工作时都会收到此警告

7/01/09 07:32:51 WARN SparkContext:使用现有的SparkContext,某些配置可能不会生效.

查询是否意味着SparkContext已经存在,我的工作就是选择它. 查询为什么没有进行配置

private val conf = new SparkConf()
    .setAppName("ELSSIE_Ingest_Cassandra")
    .setMaster(sparkIp)
    .set("spark.sql.shuffle.partitions", "8")
    .set("spark.cassandra.connection.host", cassandraIp)
    .set("spark.sql.crossJoin.enabled", "true")


object SparkJob extends Enumeration {

  val Program1, Program2, Program3, Program4, Program5 = Value
}

object ElssieCoreContext {

def getSparkSession(sparkJob: SparkJob.Value = SparkJob.RnfIngest): SparkSession = {
     val sparkSession = sparkJob match {
          case SparkJob.Program1 => {
            val updatedConf = conf.set("spark.cassandra.output.batch.size.bytes", "2048").set("spark.sql.broadcastTimeout", "2000")
            SparkSession.builder().config(updatedConf).getOrCreate()
          }
          case SparkJob.Program2 => {
            val updatedConf = …
Run Code Online (Sandbox Code Playgroud)

apache-spark

5
推荐指数
0
解决办法
2241
查看次数

SBT:如何在scala 2.12项目中使用scala 2.11库

我正在尝试升级我的项目以使用scala 2.12但是,我使用了一些库,它们没有2.12版本(mongoquery-casbah,salat).

如果我强制使用这些库的2.11版本("com.github.salat" % "salat_2.11" % "1.10.0", "com.github.limansky" % "mongoquery-casbah_2.11" % "0.5"),我会收到错误:

[error] Modules were resolved with conflicting cross-version suffixes in {file:/C:/work/GeneASS/}dao:
[error]    org.scala-lang.modules:scala-parser-combinators _2.11, _2.12
Run Code Online (Sandbox Code Playgroud)

我该如何解决这个问题?

scala sbt

4
推荐指数
1
解决办法
1905
查看次数

Kafka 减少消费者的延迟

我正在设置新的 Kafka 集群,出于测试目的,我创建了具有 1 个分区和 3 个副本的主题。

现在,我通过生产者并行地触发消息,比如每秒 50K 条消息。我在一个组内创建了一个消费者,它每秒只能获取 30K 条消息。

我可以更改主题级别、分区级别、消费者级别的配置。

我正在通过 grafana + prometheus 监控一切。

知道哪种配置或其他东西可以帮助我消费更多数据吗?

提前致谢

apache-kafka kafka-consumer-api

3
推荐指数
1
解决办法
2119
查看次数

通过 AWS Lambda 函数访问 EC2 实例服务

我有一个只有私有 IP 的 EC2 实例,并且我在 say 上运行的同一个实例上安装了 Apache Kafka 10.0.4.44:9092。现在,我创建了 AWS Lambda 函数,它从给定的 Bucket 读取文档并将文档正文发送到在 EC2 实例上运行的 Apache Kafka。

现在,AWS Lambda 无法访问 EC2 实例服务。

如何授予 Lambda 访问权限以访问 EC2 上的 Apache kafka 服务?

amazon-s3 amazon-ec2 amazon-web-services aws-lambda

2
推荐指数
1
解决办法
2536
查看次数

MySql 对 Json 列的支持

Json在 MySql 中有一个类型列,我正在使用带有 Slick 的 Scala。我如何Json通过 Slick为Column提供支持。

class SampleTable(tag: Tag) extends Table[(String, ??)](tag, "test")  {

  override def * : ProvenShape[NodeReference] = (name, data)

  def name: Rep[String] = column[String]("name", O.PrimaryKey)
  def Data: Rep[??] = column[??]("data", O.PrimaryKey)

}
Run Code Online (Sandbox Code Playgroud)

任何帮助将不胜感激。提前致谢

mysql scala slick play-slick slick-3.0

2
推荐指数
1
解决办法
860
查看次数

Why I Shouldn't use Nothing in Place of Unit

I was looking at the Scala's Hierarchy and then This click in my mind.

What if I declare the None like Option[Unit] and Nil like List[Unit]

What Exactly is the difference between Unit and Nothing?

What will happen If I start using Nothing in Place of Unit?

Thanks

scala

2
推荐指数
1
解决办法
59
查看次数