标签: apache-spark-dataset

Apache Spark 数据集创建中的 Scala 泛型类型用法

以下代码在 Spark 中创建一个空数据集。

    scala> val strings = spark.emptyDataset[String]
    strings: org.apache.spark.sql.Dataset[String] = [value: string]
Run Code Online (Sandbox Code Playgroud)

emptyDataset 的签名是..

     @Experimental  
     @InterfaceStability.Evolving
     def emptyDataset[T: Encoder]: Dataset[T] = {
         val encoder = implicitly[Encoder[T]]
         new Dataset(self, LocalRelation(encoder.schema.toAttributes), encoder)   
     }
Run Code Online (Sandbox Code Playgroud)

为什么上面的签名没有强制 T 成为 Encoder 的子类型?

它接受字符串类型的 T 并为字符串创建一个编码器并将其传递给数据集构造函数。它最终创建数据集[字符串]。

scala apache-spark scala-generics apache-spark-dataset

2
推荐指数
1
解决办法
1315
查看次数

有条件地将`filter` /`where`应用于Spark`Dataset` /`Dataframe`

嗨,大家好,我有一个函数可以从S3的某些位置加载数据集并返回有趣的数据

private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  // pick rows for the given marketplaces
  .where($"mid".isin(mids: _*))
  // pick rows for the given indices
  .where($"index".isin(indices: _*))
Run Code Online (Sandbox Code Playgroud)

}

如果有人提供mids = Seq()或,此实现将过滤掉所有内容indices = Seq()。另一方面,我希望语义是“仅在mids不为空的情况下应用此where子句”(与相同indices),这样,如果函数的用户提供空序列,则不会进行过滤。

有没有很好的功能方法可以做到这一点?

scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

2
推荐指数
1
解决办法
1836
查看次数

WrappedArray的WrappedArray到java数组

我有一个类型集的列,我使用collect_set()spark数据集API,它返回一个包装数组的包装数组.我想要嵌套包装数组的所有值中的单个数组.我怎样才能做到这一点?

例如.卡桑德拉表:

Col1  
{1,2,3}
{1,5}
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark Dataset API.
row.get(0)返回一个包装数组的包装数组.

java cassandra apache-spark apache-spark-dataset

2
推荐指数
1
解决办法
5152
查看次数

根据索引 id 的 ArrayList 过滤 JavaRDD

我有数据集 df,其内容的索引为 accountid,我也有带 accountid 的数组列表。如何过滤或映射数据集以创建仅包含基于数组列表中的 accountid 内容的新数据集。

我正在使用 Java 8

List<String> accountIdList= new ArrayList<String>();
accountIdList.add("1001");
accountIdList.add("1002");
accountIdList.add("1003");
accountIdList.add("1004");
Dataset<Row> filteredRows=  df.filter(p-> df.col("accountId").equals(accountIdList));
Run Code Online (Sandbox Code Playgroud)

我正在尝试将列表本身传递给比较运算符,您认为这是正确的方法吗

Java 语法是

如果您正在寻找 java 语法

Dataset<Row> filteredRows=  df.where(df.col("accountId").isin(accountIdList.toArray()));
Run Code Online (Sandbox Code Playgroud)

java-8 apache-spark apache-spark-dataset

2
推荐指数
1
解决办法
718
查看次数

Spark DataFrame/Dataset 为每个key找到最常见的值 高效方式

问题: 我在映射 spark 中键的最常见值时遇到问题(使用 scala)。我已经用 RDD 完成了,但不知道如何有效地使用 DF/DS(sparksql)

数据集就像

key1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a
Run Code Online (Sandbox Code Playgroud)

火花转换和访问输出后应该是每个键都有其共同的值

输出

key1 = valueb
key2 = valuec
key3 = valuea
Run Code Online (Sandbox Code Playgroud)

尝试到现在:

RDD

我试图(key,value),count在 RDD 中按组进行映射和减少,并且它产生逻辑,但我无法将其转换为 sparksql(DataFrame/Dataset)(因为我希望跨网络的最小洗牌)

这是我的 RDD 代码

 val data = List(

"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"

)

val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)

val lineRDD = sc.parallelize(data)

val pairedRDD …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-dataset

2
推荐指数
1
解决办法
5894
查看次数

Spark:数据集序列化

如果我有一个数据集,每个记录的每个记录都是一个案例类,那么我按如下所示保留该数据集,以便使用序列化:

myDS.persist(StorageLevel.MERORY_ONLY_SER)
Run Code Online (Sandbox Code Playgroud)

Spark是否使用Java / kyro序列化序列化数据集?还是像数据框一样,Spark有自己的数据存储方式?

serialization scala apache-spark apache-spark-dataset

2
推荐指数
2
解决办法
3068
查看次数

Spark:数据集上的映射组

我正在下面的数据集上尝试这个 mapgroups 函数,但不知道为什么我的“总价值”列为 0。我在这里遗漏了什么???请指教

Spark 版本 - 2.0 Scala 版本 - 2.11

case class Record(Hour: Int, Category: String,TotalComm: Double, TotalValue: Int)
val ss = (SparkSession)
import ss.implicits._

val df: DataFrame = ss.sparkContext.parallelize(Seq(
(0, "cat26", 30.9, 200), (0, "cat26", 22.1, 100), (0, "cat95", 19.6, 300), (1, "cat4", 1.3, 100),
(1, "cat23", 28.5, 100), (1, "cat4", 26.8, 400), (1, "cat13", 12.6, 250), (1, "cat23", 5.3, 300),
(0, "cat26", 39.6, 30), (2, "cat40", 29.7, 500), (1, "cat4", 27.9, 600), (2, "cat68", 9.8, 100), …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-dataframe apache-spark-dataset

2
推荐指数
1
解决办法
7705
查看次数

为什么数据集的 foreach 方法不需要编码器,而 map 需要?

我有两个数据集:Dataset[User]Dataset[Book]其中两个UserBook区分类别。我像这样加入他们:

val joinDS = ds1.join(ds2, "userid")

如果我尝试map遍历 中的每个元素joinDS,编译器会抱怨缺少编码器:

not enough arguments for method map: (implicit evidence$46: org.apache.spark.sql.Encoder[Unit])org.apache.spark.sql.Dataset[Unit]. Unspecified value parameter evidence$46. Unable to find encoder for type stored in a Dataset.

但是如果我使用foreach而不是map. 为什么也不foreach需要编码器?我已经从 spark 会话中导入了所有隐式,那么map当数据集是连接包含案例类的两个数据集的结果时,为什么根本需要编码器)?另外,我从那个连接中得到什么类型的数据集?它是一个Dataset[Row],还是别的什么?

scala apache-spark apache-spark-dataset apache-spark-encoders

2
推荐指数
1
解决办法
232
查看次数

将案例类传递给函数参数

抱歉问了一个简单的问题。我想将 case 类传递给函数参数,并且我想在函数内部进一步使用它。到目前为止我有试过这个TypeTagClassTag,但由于某些原因,我无法正确地使用它或可我不看正确的位置。

用例与此类似:

case class infoData(colA:Int,colB:String)
case class someOtherData(col1:String,col2:String,col3:Int)

def readCsv[T:???](path:String,passedCaseClass:???): Dataset[???] = {
  sqlContext
    .read
    .option("header", "true")
    .csv(path)
    .as[passedCaseClass]
}
Run Code Online (Sandbox Code Playgroud)

它将被称为这样的:

val infoDf = readCsv("/src/main/info.csv",infoData)
val otherDf = readCsv("/src/main/someOtherData.csv",someOtherData)
Run Code Online (Sandbox Code Playgroud)

scala case-class apache-spark apache-spark-dataset classtag

2
推荐指数
2
解决办法
3314
查看次数

Scala 编译器无法推断 Spark lambda 函数内的类型

假设我有用 Scala 2.12 编写的 Spark 代码

    val dataset = spark.emptyDataset[String]

    dataset.foreachPartition( partition => partition.foreach {
      entry: String => println(entry)
    })

Run Code Online (Sandbox Code Playgroud)

当我运行代码时,编译器给出了这个错误


[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error]     empty.foreachPartition( partition => partition.foreach{
[error]                                                    ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM


Run Code Online (Sandbox Code Playgroud)

为什么编译器partition作为一个Object而不是Iterator[String]

我必须手动添加partition类型才能使代码正常工作。

    val dataset = …
Run Code Online (Sandbox Code Playgroud)

lambda scala apache-spark apache-spark-dataset

2
推荐指数
1
解决办法
434
查看次数