我想在Spark数据集中将整列的大小写更改为小写
Desired Input
+------+--------------------+
|ItemID| Category name|
+------+--------------------+
| ABC|BRUSH & BROOM HAN...|
| XYZ|WHEEL BRUSH PARTS...|
+------+--------------------+
Desired Output
+------+--------------------+
|ItemID| Category name|
+------+--------------------+
| ABC|brush & broom han...|
| XYZ|wheel brush parts...|
+------+--------------------+
Run Code Online (Sandbox Code Playgroud)
我尝试使用collectAsList()和toString(),这对于非常大的数据集来说是一个缓慢而复杂的过程.
我还发现了一种方法'较低',但没有知道如何让它在dasaset中工作请建议我一个简单或有效的方法来做到这一点.提前致谢
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我想从一个简单的CSV文件创建一个Spark数据集.以下是CSV文件的内容:
name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"
Run Code Online (Sandbox Code Playgroud)
以下是制作数据集的代码:
var location = "s3a://path_to_csv"
case class City(name: String, state: String, number_of_people: Long)
val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]
Run Code Online (Sandbox Code Playgroud)
以下是错误消息:"无法number_of_people向字符串转换为bigint,因为它可能会截断"
Databricks讨论了如何在此博客文章中创建数据集和此特定错误消息.
编码器急切地检查您的数据是否与预期的架构匹配,在您尝试错误处理TB数据之前提供有用的错误消息.例如,如果我们尝试使用太小的数据类型,那么转换为对象将导致截断(即numStudents大于一个字节,其最大值为255),Analyzer将发出AnalysisException.
我正在使用该Long类型,所以我没想到会看到此错误消息.
我试图根据制造商列内容将数据集拆分为不同的数据集.它非常慢
请建议一种改进代码的方法,以便它可以更快地执行并减少Java代码的使用.
List<Row> lsts= countsByAge.collectAsList();
for(Row lst:lsts){
String man=lst.toString();
man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
Dataset<Row> DF = src.filter("Manufacturer='"+man+"'");
DF.show();
}
Run Code Online (Sandbox Code Playgroud)
代码,输入和输出数据集如下所示.
package org.sparkexample;
import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
public class GroupBy {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "C:\\winutils");
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate();
sc.setLogLevel("ERROR");
Dataset<Row> src= sqlContext.read()
.format("com.databricks.spark.csv")
.option("header", …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
我正在使用Spark Dataset(Spark 1.6.1版本).以下是我的代码
object App {
val conf = new SparkConf()
.setMaster("local")
.setAppName("SparkETL")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._
}
override def readDataTable(tableName:String):DataFrame={
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP);
return dataFrame;
}
case class Student(stud_id , sname , saddress)
case class Student(classid, stud_id, name)
var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student")
var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student")
var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff")
Run Code Online (Sandbox Code Playgroud)
现在我想在多列上执行group by子句?怎么做?
result.groupBy(_._1._1.created_at)我可以这样做吗?如果是的话,那么我不能将结果看作一个组也是如何在多个列上进行的?
我有一个类似于以下示例的Spark数据集:
0 1 2 3
+------+------------+--------------------+---+
|ItemID|Manufacturer| Category |UPC|
+------+------------+--------------------+---+
| 804| ael|Brush & Broom Han...|123|
| 805| ael|Wheel Brush Parts...|124|
+------+------------+--------------------+---+
Run Code Online (Sandbox Code Playgroud)
我需要通过搜索列标题来查找列的位置.
例如:
int position=getColumnPosition("Category");
Run Code Online (Sandbox Code Playgroud)
这应该返回2.
是否有Dataset<Row>数据类型支持的Spark函数来查找列索引或可以在Spark数据集上运行的任何java函数?
请帮助我理解在数据集上使用时传递给groupByKey的参数
scala> val data = spark.read.text("Sample.txt").as[String]
data: org.apache.spark.sql.Dataset[String] = [value: string]
scala> data.flatMap(_.split(" ")).groupByKey(l=>l).count.show
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,请帮助我理解groupByKey(l => l)中的(l => l)含义.