标签: apache-spark-sql

从spark数据帧中获取特定行

df[100, c("column")]在scala spark数据框中是否有任何替代方法.我想从一列火花数据框中选择特定的行.例如100th,在R等效代码中的行

apache-spark apache-spark-sql

24
推荐指数
5
解决办法
6万
查看次数

Scala Spark DataFrame:dataFrame.select给定列名称序列的多个列

val columnName=Seq("col1","col2",....."coln");
Run Code Online (Sandbox Code Playgroud)

有没有办法执行dataframe.select操作以获取仅包含指定列名的数据帧.我知道我可以做,dataframe.select("col1","col2"...) 但是columnName在运行时生成.我可以dataframe.select()在循环中为每个列名重复执行.它会有任何性能开销吗?有没有其他更简单的方法来实现这一目标?

scala dataframe apache-spark apache-spark-sql

24
推荐指数
2
解决办法
6万
查看次数

如何在Spark数据框中展平结构?

我有一个具有以下结构的数据帧:

 |-- data: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- keyNote: struct (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- note: string (nullable = true)
 |    |-- details: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)

如何展平结构并创建新的数据框:

     |-- id: long (nullable = true)
     |-- keyNote: struct (nullable = true)
     |    |-- key: string (nullable = true)
     |    |-- note: …
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-sql

24
推荐指数
6
解决办法
3万
查看次数

Spark加载数据并将文件名添加为dataframe列

我正在使用包装函数将一些数据加载到Spark中:

def load_data( filename ):
    df = sqlContext.read.format("com.databricks.spark.csv")\
        .option("delimiter", "\t")\
        .option("header", "false")\
        .option("mode", "DROPMALFORMED")\
        .load(filename)
    # add the filename base as hostname
    ( hostname, _ ) = os.path.splitext( os.path.basename(filename) )
    ( hostname, _ ) = os.path.splitext( hostname )
    df = df.withColumn('hostname', lit(hostname))
    return df
Run Code Online (Sandbox Code Playgroud)

具体来说,我使用glob来一次加载一堆文件:

df = load_data( '/scratch/*.txt.gz' )
Run Code Online (Sandbox Code Playgroud)

文件是:

/scratch/host1.txt.gz
/scratch/host2.txt.gz
...
Run Code Online (Sandbox Code Playgroud)

我想列"主机名"实际上包含文件的真实名称被加载,而不是水珠(即host1,host2等等,而不是*).

我怎样才能做到这一点?

apache-spark apache-spark-sql pyspark

24
推荐指数
1
解决办法
8622
查看次数

根据指定黑名单标准的另一个DataFrame过滤Spark DataFrame

我有一个largeDataFrame(多列和数十亿行)和一个smallDataFrame(单列和10,000行).

我想所有的行从过滤largeDataFrame每当some_identifier列在largeDataFrame比赛中的行之一smallDataFrame.

这是一个例子:

largeDataFrame

some_idenfitier,first_name
111,bob
123,phil
222,mary
456,sue
Run Code Online (Sandbox Code Playgroud)

smallDataFrame

some_identifier
123
456
Run Code Online (Sandbox Code Playgroud)

desiredOutput

111,bob
222,mary
Run Code Online (Sandbox Code Playgroud)

这是我丑陋的解决方案.

val smallDataFrame2 = smallDataFrame.withColumn("is_bad", lit("bad_row"))
val desiredOutput = largeDataFrame.join(broadcast(smallDataFrame2), Seq("some_identifier"), "left").filter($"is_bad".isNull).drop("is_bad")
Run Code Online (Sandbox Code Playgroud)

有更清洁的解决方案吗?

dataframe apache-spark apache-spark-sql

24
推荐指数
2
解决办法
2万
查看次数

如何在DataFrame中将时间戳转换为日期格式?

我有一个DataFrameTimestamp列,我需要为转换Date格式.

是否有可用的Spark SQL函数?

apache-spark apache-spark-sql

24
推荐指数
3
解决办法
4万
查看次数

partitionColumn,lowerBound,upperBound,numPartitions参数是什么意思?

虽然通过在星火JDBC连接获取来自SQL Server的数据,我发现我可以设置一些并行的参数,如partitionColumn,lowerBound,upperBound,和numPartitions.我已经通过spark文档,但无法理解它.

谁能解释一下这些参数的含义?

jdbc apache-spark apache-spark-sql

24
推荐指数
4
解决办法
1万
查看次数

用于行类型Spark数据集的编码器

我想在DataSet中为Row类型编写一个编码器,用于我正在进行的地图操作.基本上,我不明白如何编写编码器.

以下是地图操作的示例:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());
Run Code Online (Sandbox Code Playgroud)

我明白,编码器需要编写如下代码:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };
Run Code Online (Sandbox Code Playgroud)

但是,我不理解编码器中的clsTag(),我试图找到一个可以演示相似内容的运行示例(即行类型的编码器)

编辑 - 这不是所提问题的副本:尝试将数据帧行映射到更新行时编码器错误,因为答案谈到在Spark 2.x中使用Spark 1.x(我不是这样做),我也在寻找用于Row类的编码器而不是解决错误.最后,我一直在寻找Java解决方案,而不是Scala.

java apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

24
推荐指数
2
解决办法
2万
查看次数

Spark数据帧:collect()vs select()

调用collect()RDD会将整个数据集返回给驱动程序,这会导致内存不足,我们应该避免这种情况.

collect()如果在数据帧上调用,它的行为方式会相同吗?方法怎么
select()
它是否也像collect()在数据帧上调用一样工作?

bigdata dataframe apache-spark apache-spark-sql

24
推荐指数
5
解决办法
9万
查看次数

使用pyspark获取列的数据类型

我们正在从MongoDB读取数据Collection.Collection列有两个不同的值(例如:) (bson.Int64,int) (int,float).

我试图使用pyspark获取数据类型.

我的问题是有些列有不同的数据类型.

假设quantity并且weight是列

quantity           weight
---------          --------
12300              656
123566000000       789.6767
1238               56.22
345                23
345566677777789    21
Run Code Online (Sandbox Code Playgroud)

实际上我们没有为mongo集合的任何列定义数据类型.

当我从中查询计数时 pyspark dataframe

dataframe.count()
Run Code Online (Sandbox Code Playgroud)

我这样的例外

"Cannot cast STRING into a DoubleType (value: BsonString{value=&apos;200.0&apos;})"
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark databricks

24
推荐指数
4
解决办法
5万
查看次数