我有DataFrame3列,即Id, First Name, Last Name
我想申请GroupBy的基础上,Id并希望收集First Name, Last Name列作为列表.
示例: - 我有这样的DF
+---+-------+--------+
|id |fName |lName |
+---+-------+--------+
|1 |Akash |Sethi |
|2 |Kunal |Kapoor |
|3 |Rishabh|Verma |
|2 |Sonu |Mehrotra|
+---+-------+--------+
Run Code Online (Sandbox Code Playgroud)
我希望我的输出像这样
+---+-------+--------+--------------------+
|id |fname |lName |
+---+-------+--------+--------------------+
|1 |[Akash] |[Sethi] |
|2 |[Kunal, Sonu] |[Kapoor, Mehrotra] |
|3 |[Rishabh] |[Verma] |
+---+-------+--------+--------------------+
Run Code Online (Sandbox Code Playgroud)
提前致谢
我为Spark设置了这些配置,但每次我正在阅读或写入Cassandra表我都会得到 ioException
.setMaster(sparkIp)
.set("spark.cassandra.connection.host", cassandraIp)
.set("spark.sql.crossJoin.enabled", "true")
.set("spark.executor.memory", sparkExecutorMemory) //**26 GB**
.set("spark.executor.cores", sparkExecutorCore) // **from 4 to 8**
.set("spark.executor.instances", sparkExecutorInstances) // 1
.set("spark.cassandra.output.batch.size.bytes", "2048")
.set("spark.sql.broadcastTimeout", "2000")
.set("spark.sql.shuffle.partitions", "1000")
.set("spark.network.timeout", "80s")
.set("spark.executor.extraJavaOptions", "-verbose:gc -XX:+UseG1GC")
Run Code Online (Sandbox Code Playgroud)
sc.cassandraTableMyCaseClass //阅读代码
dataRDD..saveToCassandra("myDatabase","mytable")//编写代码
表中的数据量很大,操作也很复杂.
我正在使用带有28gb内存和8个内核的火花大师和10个具有相同配置的火花工作者,其中我使用的是26 gb内存和4到8个内核.有时我也会得到ExecutorLostException.
在Cassandra表中写入数据时的最新StackTrace
org.apache.spark.SparkException: Job aborted due to stage failure: Task 145 in stage 6.0 failed 4 times, most recent failure: Lost task 145.6 in stage 6.0 (TID 3268, 10.178.149.48): ExecutorLostFailure (executor 157 exited caused by one of the running tasks) …Run Code Online (Sandbox Code Playgroud) cassandra cassandra-2.0 apache-spark spark-cassandra-connector
我在群集上部署了一些火花.我使用spark-submit命令跟随我的项目jar 提交spark作业.
Spak Conf我的项目中有很多.Conf将根据我正在运行的课程来决定,但每次我运行spark工作时都会收到此警告
7/01/09 07:32:51 WARN SparkContext:使用现有的SparkContext,某些配置可能不会生效.
查询是否意味着SparkContext已经存在,我的工作就是选择它. 查询为什么没有进行配置
码
private val conf = new SparkConf()
.setAppName("ELSSIE_Ingest_Cassandra")
.setMaster(sparkIp)
.set("spark.sql.shuffle.partitions", "8")
.set("spark.cassandra.connection.host", cassandraIp)
.set("spark.sql.crossJoin.enabled", "true")
object SparkJob extends Enumeration {
val Program1, Program2, Program3, Program4, Program5 = Value
}
object ElssieCoreContext {
def getSparkSession(sparkJob: SparkJob.Value = SparkJob.RnfIngest): SparkSession = {
val sparkSession = sparkJob match {
case SparkJob.Program1 => {
val updatedConf = conf.set("spark.cassandra.output.batch.size.bytes", "2048").set("spark.sql.broadcastTimeout", "2000")
SparkSession.builder().config(updatedConf).getOrCreate()
}
case SparkJob.Program2 => {
val updatedConf = …Run Code Online (Sandbox Code Playgroud) 我正在尝试升级我的项目以使用scala 2.12但是,我使用了一些库,它们没有2.12版本(mongoquery-casbah,salat).
如果我强制使用这些库的2.11版本("com.github.salat" % "salat_2.11" % "1.10.0", "com.github.limansky" % "mongoquery-casbah_2.11" % "0.5"),我会收到错误:
[error] Modules were resolved with conflicting cross-version suffixes in {file:/C:/work/GeneASS/}dao:
[error] org.scala-lang.modules:scala-parser-combinators _2.11, _2.12
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?
我正在设置新的 Kafka 集群,出于测试目的,我创建了具有 1 个分区和 3 个副本的主题。
现在,我通过生产者并行地触发消息,比如每秒 50K 条消息。我在一个组内创建了一个消费者,它每秒只能获取 30K 条消息。
我可以更改主题级别、分区级别、消费者级别的配置。
我正在通过 grafana + prometheus 监控一切。
知道哪种配置或其他东西可以帮助我消费更多数据吗?
提前致谢
我有一个只有私有 IP 的 EC2 实例,并且我在 say 上运行的同一个实例上安装了 Apache Kafka 10.0.4.44:9092。现在,我创建了 AWS Lambda 函数,它从给定的 Bucket 读取文档并将文档正文发送到在 EC2 实例上运行的 Apache Kafka。
现在,AWS Lambda 无法访问 EC2 实例服务。
如何授予 Lambda 访问权限以访问 EC2 上的 Apache kafka 服务?
我Json在 MySql 中有一个类型列,我正在使用带有 Slick 的 Scala。我如何Json通过 Slick为Column提供支持。
class SampleTable(tag: Tag) extends Table[(String, ??)](tag, "test") {
override def * : ProvenShape[NodeReference] = (name, data)
def name: Rep[String] = column[String]("name", O.PrimaryKey)
def Data: Rep[??] = column[??]("data", O.PrimaryKey)
}
Run Code Online (Sandbox Code Playgroud)
任何帮助将不胜感激。提前致谢
I was looking at the Scala's Hierarchy and then This click in my mind.
What if I declare the None like Option[Unit] and Nil like List[Unit]
What Exactly is the difference between Unit and Nothing?
What will happen If I start using Nothing in Place of Unit?
Thanks
scala ×4
apache-spark ×3
amazon-ec2 ×1
amazon-s3 ×1
apache-kafka ×1
aws-lambda ×1
cassandra ×1
mysql ×1
play-slick ×1
sbt ×1
slick ×1
slick-3.0 ×1