标签: spark-dataframe

如何使用 DataFrame API 和 SCALA 在 Spark 中读取固定长度的文件

我有一个固定长度的文件(一个示例如下所示),我想使用 SCALA(不是 python 或 java)在 Spark 中使用 DataFrames API 读取这个文件。使用 DataFrames API 有读取 textFile、json 文件等的方法,但不确定是否有读取固定长度文件的方法。我正在互联网上搜索这个并找到了一个 github链接,但我spark-fixedwidth-assembly-1.0.jar为此目的下载了但是我无法在任何地方找出 jar。我完全迷失在这里,需要您的建议和帮助。Stackoverflow 中有几篇文章,但它们与 Scala 和 DataFrame API 无关。

这是文件

56 apple     TRUE 0.56
45 pear      FALSE1.34
34 raspberry TRUE 2.43
34 plum      TRUE 1.31
53 cherry    TRUE 1.4 
23 orange    FALSE2.34
56 persimmon FALSE23.2
Run Code Online (Sandbox Code Playgroud)

每列的固定宽度为 3, 10, 5, 4

请提出您的意见。

scala apache-spark spark-dataframe

1
推荐指数
1
解决办法
1万
查看次数

从自定义数据格式创建Spark数据框架

我有一个文本文件,其中String REC作为记录定界符,而换行符作为列定界符,每个数据都有附加的列名,以逗号作为定界符,以下是示例数据格式

REC
Id,19048
任期,牛奶
等级,1
REC
Id,19049
任期,玉米
等级,5

使用REC作为记录定界符。现在,我想创建具有列名ID,术语和等级的Spark数据框架。

hadoop scala apache-spark spark-dataframe

1
推荐指数
1
解决办法
2280
查看次数

使用 sparkxml 从 xml 中提取标签属性

我正在使用 com.databricks.spark.xml 加载一个 xml 文件,我想使用 sql 上下文读取标签属性。

XML :

<Receipt>
<Sale>
<DepartmentID>PR</DepartmentID>
<Tax TaxExempt="false" TaxRate="10.25"/>
</Sale>
</Receipt>
Run Code Online (Sandbox Code Playgroud)

加载文件,

val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Receipt").load("/home/user/sale.xml");
df.registerTempTable("SPtable");
Run Code Online (Sandbox Code Playgroud)

打印架构:

root
 |-- Sale: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- DepartmentID: long (nullable = true)
 |    |    |-- Tax: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

现在我想从 Tax.I 中提取标签属性 TaxExempt。我尝试了以下代码,但它给了我错误。

val tax =sqlContext.sql("select Sale.Tax.TaxExempt from SPtable");
Run Code Online (Sandbox Code Playgroud)

错误:

org.apache.spark.sql.AnalysisException: cannot resolve 'Sale.Tax[TaxExempt]' due to data type mismatch: argument 2 requires integral type, however, …
Run Code Online (Sandbox Code Playgroud)

xml xml-parsing apache-spark apache-spark-sql spark-dataframe

1
推荐指数
1
解决办法
2702
查看次数

如何从相对较大的 Spark 数据框中获取最高百分比并将其保存到文件中

我正在开发一个 Spark 程序,它计算每个用户的概率,从而产生一个相对较大的数据帧(~137.5M 行)。我需要做的是取这些用户的前 10%(10 个是任意的,当然可以更改)并将它们保存到文件中。

一个最小化的例子是:

  • 鉴于此数据框: hc.sparkContext.parallelize(Array(("uid1", "0.5"), ("uid2", "0.7"), ("uid3", "0.3"))).toDF("uuid", "prob")
  • 并给定阈值 0.3
  • 我希望输出为 ("uid2", "0.7") 并保存到文件 "output" 中,因为 "uid2" 的概率最高,我只需要从数据框中获取前 0.3% 的用户

所以我的问题是:使用相对较大的数据框来做到这一点的干净有效的方法是什么?

可以计算输入数据帧的 10% 有多少用户,然后使用 top 和大小。但是,我对此有两个担忧:

  1. 使用 top 时 - 是否在洗牌前先减少数据,方法是从每个执行程序中取出前 10%,然后从洗牌数据中取出 10%?如果没有,是否有内置的方法来执行我的建议?或者我应该自己实现它?
  2. Top 返回一个仍然很大的数组......我更愿意做的是将其保留为数据帧并保存其输出(也许在洗牌后重新分区数据)。有没有办法做到这一点而不将其转换为数组然后并行化它?

如果需要,我不介意使用 RDD 而不是数据帧

我目前使用的是 Spark 1.6.1

提前致谢

performance apache-spark apache-spark-sql spark-dataframe

1
推荐指数
1
解决办法
3574
查看次数

对象 SparkSession 不是包 org.apache.spark.sql 的成员

我正在尝试将最新的 Spark api 与 SparkSession 一起使用。

当我导入包时,我的 Eclipse 在附件中显示错误。

我正在使用 2.10.6 scala 编译器。

请帮我解决这个问题。

绒球 代码

scala apache-spark hadoop2 apache-spark-sql spark-dataframe

1
推荐指数
1
解决办法
2万
查看次数

如何在Spark 2.0中启用笛卡尔联接?

我必须在Spark 2.0中交叉加入2个数据帧我遇到以下错误:

用户类抛出异常:

org.apache.spark.sql.AnalysisException: Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true; 
Run Code Online (Sandbox Code Playgroud)

请帮我在哪里设置这个配置,我在eclipse中编码.

apache-spark apache-spark-sql spark-dataframe

1
推荐指数
1
解决办法
9694
查看次数

从pyspark数据框中减去平均值

我正在尝试计算数据框中每一列的平均值,并从该列中的每个元素中减去。我创建了一个尝试这样做的函数,但是当我尝试使用 UDF 实现它时,出现错误:“float”对象没有属性“map”。关于如何创建这样一个功能的任何想法?谢谢!

def normalize(data):
        average=data.map(lambda x: x[0]).sum()/data.count()
        out=data.map(lambda x: (x-average))
        return out

mapSTD=udf(normalize,IntegerType())     
dats = data.withColumn('Normalized', mapSTD('Fare'))
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe

1
推荐指数
1
解决办法
4642
查看次数

如何在 spark scala 中将二进制字符串转换为 scala 字符串

我正在读取一个包含二进制字符串字段的 avro 文件,我需要将其转换为 java.lang.string 以将其传递给另一个库(spark-xml-util),如何将其转换为 java.lang。字符串有效。这是我到目前为止得到的代码:-

    val df = sqlContext.read.format("com.databricks.spark.avro").load("filePath/fileName.avro")
    df.select("myField").collect().mkString
Run Code Online (Sandbox Code Playgroud)

最后一行给了我以下异常:-

Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at org.apache.spark.sql.Row$class.getString(Row.scala:255)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:165)
Run Code Online (Sandbox Code Playgroud)

df 架构是: -

root
|-- id: string (nullable = true)
|-- myField: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)

string scala apache-spark spark-dataframe

1
推荐指数
2
解决办法
1万
查看次数

如何使用scala在Apache spark中用空字符串(“”)替换空值

我正在使用 Apache spark 中的巨大数据集(包含 332 个字段)与大约 10M 记录的 scala(除了一个字段,其余 331 个可以为空)。但我想用空白字符串(“”)替换 null。由于我有大量字段,实现这一目标的最佳方法是什么?我想在导入此数据集时处理空值,因此在执行转换或导出到 DF 时我会很安全。所以我创建了具有 332 个字段的案例类,处理这些空值的最佳方法是什么?我可以使用 Option(field).getOrElse(""),但我想这不是最好的方法,因为我有大量的字段。谢谢!!

scala apache-spark apache-spark-sql spark-dataframe

1
推荐指数
2
解决办法
9874
查看次数

为什么 pyspark agg 告诉我这里的数据类型不正确?

我有一个具有以下类型的数据框:

>>> mydf.printSchema()
root
 |-- protocol: string (nullable = true)
 |-- source_port: long (nullable = true)
 |-- bytes: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)

当我尝试像这样聚合它时:

df_agg = mydf.groupBy('protocol').agg(sum('bytes'))
Run Code Online (Sandbox Code Playgroud)

有人告诉我:

TypeError: unsupported operand type(s) for +: 'int' and 'str'
Run Code Online (Sandbox Code Playgroud)

现在,这对我来说没有意义,因为我看到类型适合聚合,printSchema()如上所示。

因此,我尝试将其转换为整数以防万一:

mydf_converted = mydf.withColumn("converted",mydf["bytes_out"].cast(IntegerType()).alias("bytes_converted"))
Run Code Online (Sandbox Code Playgroud)

但仍然失败:

my_df_agg_converted = mydf_converted.groupBy('protocol').agg(sum('converted'))

TypeError: unsupported operand type(s) for +: 'int' and 'str'
Run Code Online (Sandbox Code Playgroud)

如何解决?我查看了这个问题,但该修复程序根本没有帮助我 - 同样的问题: PySpark DataFrame 上的 Sum 操作在类型正常时给出 TypeError

python pyspark spark-dataframe

1
推荐指数
1
解决办法
2102
查看次数