小编vij*_*jay的帖子

在Scala中获取两个数字之间的随机数

如何获得两个数字之间的随机数,比如20到30?

我试过了:

val r = new scala.util.Random
r.nextInt(30)
Run Code Online (Sandbox Code Playgroud)

这只允许上限值,但值始终从0开始.有没有办法设置下限值(在示例中为20)?

谢谢!

random scala

21
推荐指数
5
解决办法
3万
查看次数

DataFrame partitionBy嵌套列

我试图在嵌套字段上调用partitionBy,如下所示:

val rawJson = sqlContext.read.json(filename)
rawJson.write.partitionBy("data.dataDetails.name").parquet(filenameParquet)
Run Code Online (Sandbox Code Playgroud)

我运行时遇到以下错误.我确实看到"名称"列为以下架构中的字段.是否有不同的格式来指定嵌套的列名?

java.lang.RuntimeException:在模式StructType中找不到分区列data.dataDetails.name(StructField(name,StringType,true),StructField(time,StringType,true),StructField(data,StructType(StructType)(dataDetails,StructType(StructField) (name,StringType,true),StructField(id,StringType,true),true)),true))

这是我的json文件:

{  
  "name": "AssetName",
  "time": "2016-06-20T11:57:19.4941368-04:00",
  "data": {
    "type": "EventData",
    "dataDetails": {
      "name": "EventName"
      "id": "1234"

    }
  }
} 
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

6
推荐指数
1
解决办法
2286
查看次数

什么是按列分区但保持固定分区计数的有效方法?

按字段将数据划分为预定义分区计数的最佳方法是什么?

我目前通过指定partionCount = 600来分区数据.发现计数600为我的数据集/集群设置提供了最佳查询性能.

val rawJson = sqlContext.read.json(filename).coalesce(600)
rawJson.write.parquet(filenameParquet)
Run Code Online (Sandbox Code Playgroud)

现在我想通过列'eventName'对这些数据进行分区,但仍然保持计数600.数据当前有大约2000个唯一的eventNames,加上每个eventName中的行数不统一.大约10个eventNames有超过50%的数据导致数据偏斜.因此,如果我像下面那样进行分区,它的性能不是很好.写入比没有写入时间长5倍.

val rawJson = sqlContext.read.json(filename)
rawJson.write.partitionBy("eventName").parquet(filenameParquet)
Run Code Online (Sandbox Code Playgroud)

为这些场景划分数据的好方法是什么?有没有办法按eventName进行分区,但是将其扩展到600个分区?

我的架构如下所示:

{  
  "eventName": "name1",
  "time": "2016-06-20T11:57:19.4941368-04:00",
  "data": {
    "type": "EventData",
    "dataDetails": {
      "name": "detailed1",
      "id": "1234",
...
...
    }
  }
} 
Run Code Online (Sandbox Code Playgroud)

谢谢!

apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4628
查看次数

如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度?

我有一个火花工作,我需要在每个微批处理中编写SQL查询的输出。写入是一项非常昂贵的操作,并且会导致批处理执行时间超过批处理间隔。

我正在寻找提高写入性能的方法。

  1. 像下面显示的那样,在单独的线程中异步执行写操作是一个好选择吗?

  2. 因为Spark本身以分布式方式执行,这会引起任何副作用吗?

  3. 还有其他/更好的方法来加快写入速度吗?

    // Create a fixed thread pool to execute asynchronous tasks
    val executorService = Executors.newFixedThreadPool(2)
    dstream.foreachRDD { rdd =>
      import org.apache.spark.sql._
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
      import spark.implicits._
      import spark.sql
    
      val records = rdd.toDF("record")
      records.createOrReplaceTempView("records")
      val result = spark.sql("select * from records")
    
      // Submit a asynchronous task to write
      executorService.submit {
        new Runnable {
          override def run(): Unit = {
            result.write.parquet(output)
          }
        }
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

performance apache-spark spark-streaming apache-spark-sql

4
推荐指数
1
解决办法
1546
查看次数

如何枚举HDFS目录中的文件

如何枚举 HDFS 目录中的文件?这是用于使用 Scala 枚举 Apache Spark 集群中的文件。我看到有 sc.textfile() 选项,但它也会读取内容。我只想读取文件名。

我实际上尝试了 listStatus。但是没有用。得到以下错误。我正在使用 Azure HDInsight Spark,并且 blob 存储文件夹“testContainer@testhdi.blob.core.windows.net/example/”包含 .json 文件。

val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path("wasb://testContainer@testhdi.blob.core.windows.net/example/"))
status.foreach(x=> println(x.getPath)

=========
Error:
========
java.io.FileNotFoundException: Filewasb://testContainer@testhdi.blob.core.windows.net/example does not exist.
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem.listStatus(NativeAzureFileSystem.java:2076)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:23)
    at $iwC$$iwC$$iwC.<init>(<console>:28)
    at $iwC$$iwC.<init>(<console>:30)
    at $iwC.<init>(<console>:32)
    at <init>(<console>:34)
    at .<init>(<console>:38)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) …
Run Code Online (Sandbox Code Playgroud)

hadoop scala hdfs apache-spark

2
推荐指数
1
解决办法
2791
查看次数