小编Shr*_*sha的帖子

如何将整列的大小写改为小写?

我想在Spark数据集中将整列的大小写更改为小写

        Desired Input
        +------+--------------------+
        |ItemID|       Category name|
        +------+--------------------+
        |   ABC|BRUSH & BROOM HAN...|
        |   XYZ|WHEEL BRUSH PARTS...|
        +------+--------------------+

        Desired Output
        +------+--------------------+
        |ItemID|       Category name|
        +------+--------------------+
        |   ABC|brush & broom han...|
        |   XYZ|wheel brush parts...|
        +------+--------------------+
Run Code Online (Sandbox Code Playgroud)

我尝试使用collectAsList()和toString(),这对于非常大的数据集来说是一个缓慢而复杂的过程.

我还发现了一种方法'较低',但没有知道如何让它在dasaset中工作请建议我一个简单或有效的方法来做到这一点.提前致谢

apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

14
推荐指数
2
解决办法
3万
查看次数

从CSV文件创建Spark数据集

我想从一个简单的CSV文件创建一个Spark数据集.以下是CSV文件的内容:

name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"
Run Code Online (Sandbox Code Playgroud)

以下是制作数据集的代码:

var location = "s3a://path_to_csv"

case class City(name: String, state: String, number_of_people: Long)

val cities = spark.read
  .option("header", "true")
  .option("charset", "UTF8")
  .option("delimiter",",")
  .csv(location)
  .as[City]
Run Code Online (Sandbox Code Playgroud)

以下是错误消息:"无法number_of_people向字符串转换为bigint,因为它可能会截断"

Databricks讨论了如何在此博客文章中创建数据集和此特定错误消息.

编码器急切地检查您的数据是否与预期的架构匹配,在您尝试错误处理TB数据之前提供有用的错误消息.例如,如果我们尝试使用太小的数据类型,那么转换为对象将导致截断(即numStudents大于一个字节,其最大值为255),Analyzer将发出AnalysisException.

我正在使用该Long类型,所以我没想到会看到此错误消息.

apache-spark apache-spark-dataset

9
推荐指数
1
解决办法
9130
查看次数

基于spark中的列值拆分数据集

我试图根据制造商列内容将数据集拆分为不同的数据集.它非常慢
请建议一种改进代码的方法,以便它可以更快地执行并减少Java代码的使用.

    List<Row> lsts= countsByAge.collectAsList();

        for(Row lst:lsts){
             String man=lst.toString();
             man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
             Dataset<Row> DF = src.filter("Manufacturer='"+man+"'");
             DF.show();

        }
Run Code Online (Sandbox Code Playgroud)

代码,输入和输出数据集如下所示.

    package org.sparkexample;
    import org.apache.parquet.filter2.predicate.Operators.Column;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.RelationalGroupedDataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SparkSession;

    import java.util.Arrays;
    import java.util.List;

    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
            public class GroupBy {

                public static void main(String[] args) {
                    System.setProperty("hadoop.home.dir", "C:\\winutils");
                    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
                    SQLContext sqlContext = new SQLContext(sc);
                    SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate();
                    sc.setLogLevel("ERROR");

                    Dataset<Row> src= sqlContext.read()
                                .format("com.databricks.spark.csv")
                                .option("header", …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

8
推荐指数
1
解决办法
3455
查看次数

如何在spark数据集上使用group by

我正在使用Spark Dataset(Spark 1.6.1版本).以下是我的代码

object App { 

val conf = new SparkConf()
.setMaster("local")
.setAppName("SparkETL")

val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._

}

override def readDataTable(tableName:String):DataFrame={
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP);
return dataFrame;
}


case class Student(stud_id , sname , saddress)
case class Student(classid, stud_id, name)


var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student")

var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student")


 var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff")
Run Code Online (Sandbox Code Playgroud)

现在我想在多列上执行group by子句?怎么做? result.groupBy(_._1._1.created_at)我可以这样做吗?如果是的话,那么我不能将结果看作一个组也是如何在多个列上进行的?

dataset apache-spark apache-spark-dataset

5
推荐指数
1
解决办法
3104
查看次数

通过在Apache Spark Java中搜索数据集的列标题来查找列索引

我有一个类似于以下示例的Spark数据集:

       0         1                  2          3
    +------+------------+--------------------+---+
    |ItemID|Manufacturer|       Category     |UPC|
    +------+------------+--------------------+---+
    |   804|         ael|Brush & Broom Han...|123|
    |   805|         ael|Wheel Brush Parts...|124|
    +------+------------+--------------------+---+
Run Code Online (Sandbox Code Playgroud)

我需要通过搜索列标题来查找列的位置.

例如:

int position=getColumnPosition("Category");
Run Code Online (Sandbox Code Playgroud)

这应该返回2.

是否有Dataset<Row>数据类型支持的Spark函数来查找列索引或可以在Spark数据集上运行的任何java函数?

java apache-spark apache-spark-sql apache-spark-dataset

4
推荐指数
1
解决办法
3100
查看次数

Spark数据集中的groupByKey

请帮助我理解在数据集上使用时传递给groupByKey的参数

scala> val data = spark.read.text("Sample.txt").as[String]
data: org.apache.spark.sql.Dataset[String] = [value: string]

scala> data.flatMap(_.split(" ")).groupByKey(l=>l).count.show
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,请帮助我理解groupByKey(l => l)中的(l => l)含义.

apache-spark apache-spark-dataset

0
推荐指数
1
解决办法
2万
查看次数