小编Rag*_*hav的帖子

apache spark agg()函数

对于示例数据框scholor

scala> scholor.show

| id|  name|age|sal|base|
Run Code Online (Sandbox Code Playgroud)

对于上面,下面两个都给出相同的输出。那么agg()的用途是什么。它只是为了名字。

scala> scholor.groupBy("age").sum("base").show      /*with out agg */

scala> scholor.groupBy("age").agg(sum("base")).show        /* with agg */
Run Code Online (Sandbox Code Playgroud)
scala> scholor.show

| id|  name|age|sal|base|
Run Code Online (Sandbox Code Playgroud)

是否agg()需要任何varargs作为参数?需要什么agg()

提前致谢。

scala apache-spark-sql

2
推荐指数
1
解决办法
5068
查看次数

Kafka Scala消费者代码-打印消耗的记录

当我通过使用url创建以下简单的kafka消费者时:https : //gist.github.com/akhil/6dfda8a04e33eff91a20

在该链接中,要打印消耗的记录,请使用未识别的单词“ asScala”。好的,告诉我如何迭代返回类型:ConsumerRecord [String,String],它是poll()方法的返回类型。

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

 
object KafkaConsumerEx extends App {

  val topic_name = "newtopic55"
  val consumer_group = "KafkaConsumerBatch"

  val prot = new Properties()
  prot.put("bootstrap.servers","localhost:9092")
  prot.put("group.id",consumer_group)
  prot.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer")
  prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

  val kfk_consumer = new KafkaConsumer[String,String](prot)
  kfk_consumer.subscribe(util.Collections.singleton(topic_name))
  println("here")

   while(true){
    val consumer_record : ConsumerRecords[String, String]  = kfk_consumer.poll(100)
    println("records count : " + consumer_record.count())
    println("records partitions: " + consumer_record.partitions())
    consumer_record.iterator().


  }

}
Run Code Online (Sandbox Code Playgroud)

感谢在广告中。

scala kafka-consumer-api

1
推荐指数
1
解决办法
2517
查看次数