以下代码在 Spark 中创建一个空数据集。
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
Run Code Online (Sandbox Code Playgroud)
emptyDataset 的签名是..
@Experimental
@InterfaceStability.Evolving
def emptyDataset[T: Encoder]: Dataset[T] = {
val encoder = implicitly[Encoder[T]]
new Dataset(self, LocalRelation(encoder.schema.toAttributes), encoder)
}
Run Code Online (Sandbox Code Playgroud)
为什么上面的签名没有强制 T 成为 Encoder 的子类型?
它接受字符串类型的 T 并为字符串创建一个编码器并将其传递给数据集构造函数。它最终创建数据集[字符串]。
嗨,大家好,我有一个函数可以从S3的某些位置加载数据集并返回有趣的数据
private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
// pick rows for the given marketplaces
.where($"mid".isin(mids: _*))
// pick rows for the given indices
.where($"index".isin(indices: _*))
Run Code Online (Sandbox Code Playgroud)
}
如果有人提供mids = Seq()或,此实现将过滤掉所有内容indices = Seq()。另一方面,我希望语义是“仅在mids不为空的情况下应用此where子句”(与相同indices),这样,如果函数的用户提供空序列,则不会进行过滤。
有没有很好的功能方法可以做到这一点?
scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我有一个类型集的列,我使用collect_set()spark数据集API,它返回一个包装数组的包装数组.我想要嵌套包装数组的所有值中的单个数组.我怎样才能做到这一点?
例如.卡桑德拉表:
Col1
{1,2,3}
{1,5}
Run Code Online (Sandbox Code Playgroud)
我正在使用Spark Dataset API.
row.get(0)返回一个包装数组的包装数组.
我有数据集 df,其内容的索引为 accountid,我也有带 accountid 的数组列表。如何过滤或映射数据集以创建仅包含基于数组列表中的 accountid 内容的新数据集。
我正在使用 Java 8
List<String> accountIdList= new ArrayList<String>();
accountIdList.add("1001");
accountIdList.add("1002");
accountIdList.add("1003");
accountIdList.add("1004");
Dataset<Row> filteredRows= df.filter(p-> df.col("accountId").equals(accountIdList));
Run Code Online (Sandbox Code Playgroud)
我正在尝试将列表本身传递给比较运算符,您认为这是正确的方法吗
Java 语法是
如果您正在寻找 java 语法
Dataset<Row> filteredRows= df.where(df.col("accountId").isin(accountIdList.toArray()));
Run Code Online (Sandbox Code Playgroud) 问题: 我在映射 spark 中键的最常见值时遇到问题(使用 scala)。我已经用 RDD 完成了,但不知道如何有效地使用 DF/DS(sparksql)
数据集就像
key1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a
Run Code Online (Sandbox Code Playgroud)
火花转换和访问输出后应该是每个键都有其共同的值
输出
key1 = valueb
key2 = valuec
key3 = valuea
Run Code Online (Sandbox Code Playgroud)
尝试到现在:
RDD
我试图(key,value),count在 RDD 中按组进行映射和减少,并且它产生逻辑,但我无法将其转换为 sparksql(DataFrame/Dataset)(因为我希望跨网络的最小洗牌)
这是我的 RDD 代码
val data = List(
"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"
)
val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)
val lineRDD = sc.parallelize(data)
val pairedRDD …Run Code Online (Sandbox Code Playgroud) 如果我有一个数据集,每个记录的每个记录都是一个案例类,那么我按如下所示保留该数据集,以便使用序列化:
myDS.persist(StorageLevel.MERORY_ONLY_SER)
Run Code Online (Sandbox Code Playgroud)
Spark是否使用Java / kyro序列化序列化数据集?还是像数据框一样,Spark有自己的数据存储方式?
我正在下面的数据集上尝试这个 mapgroups 函数,但不知道为什么我的“总价值”列为 0。我在这里遗漏了什么???请指教
Spark 版本 - 2.0 Scala 版本 - 2.11
case class Record(Hour: Int, Category: String,TotalComm: Double, TotalValue: Int)
val ss = (SparkSession)
import ss.implicits._
val df: DataFrame = ss.sparkContext.parallelize(Seq(
(0, "cat26", 30.9, 200), (0, "cat26", 22.1, 100), (0, "cat95", 19.6, 300), (1, "cat4", 1.3, 100),
(1, "cat23", 28.5, 100), (1, "cat4", 26.8, 400), (1, "cat13", 12.6, 250), (1, "cat23", 5.3, 300),
(0, "cat26", 39.6, 30), (2, "cat40", 29.7, 500), (1, "cat4", 27.9, 600), (2, "cat68", 9.8, 100), …Run Code Online (Sandbox Code Playgroud) 我有两个数据集:Dataset[User]和Dataset[Book]其中两个User和Book区分类别。我像这样加入他们:
val joinDS = ds1.join(ds2, "userid")
如果我尝试map遍历 中的每个元素joinDS,编译器会抱怨缺少编码器:
not enough arguments for method map: (implicit evidence$46: org.apache.spark.sql.Encoder[Unit])org.apache.spark.sql.Dataset[Unit].
Unspecified value parameter evidence$46.
Unable to find encoder for type stored in a Dataset.
但是如果我使用foreach而不是map. 为什么也不foreach需要编码器?我已经从 spark 会话中导入了所有隐式,那么map当数据集是连接包含案例类的两个数据集的结果时,为什么根本需要编码器)?另外,我从那个连接中得到什么类型的数据集?它是一个Dataset[Row],还是别的什么?
scala apache-spark apache-spark-dataset apache-spark-encoders
抱歉问了一个简单的问题。我想将 case 类传递给函数参数,并且我想在函数内部进一步使用它。到目前为止我有试过这个TypeTag和ClassTag,但由于某些原因,我无法正确地使用它或可我不看正确的位置。
用例与此类似:
case class infoData(colA:Int,colB:String)
case class someOtherData(col1:String,col2:String,col3:Int)
def readCsv[T:???](path:String,passedCaseClass:???): Dataset[???] = {
sqlContext
.read
.option("header", "true")
.csv(path)
.as[passedCaseClass]
}
Run Code Online (Sandbox Code Playgroud)
它将被称为这样的:
val infoDf = readCsv("/src/main/info.csv",infoData)
val otherDf = readCsv("/src/main/someOtherData.csv",someOtherData)
Run Code Online (Sandbox Code Playgroud) 假设我有用 Scala 2.12 编写的 Spark 代码
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( partition => partition.foreach {
entry: String => println(entry)
})
Run Code Online (Sandbox Code Playgroud)
当我运行代码时,编译器给出了这个错误
[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error] empty.foreachPartition( partition => partition.foreach{
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM
Run Code Online (Sandbox Code Playgroud)
为什么编译器partition作为一个Object而不是Iterator[String]?
我必须手动添加partition类型才能使代码正常工作。
val dataset = …Run Code Online (Sandbox Code Playgroud)