我有一个固定长度的文件(一个示例如下所示),我想使用 SCALA(不是 python 或 java)在 Spark 中使用 DataFrames API 读取这个文件。使用 DataFrames API 有读取 textFile、json 文件等的方法,但不确定是否有读取固定长度文件的方法。我正在互联网上搜索这个并找到了一个 github链接,但我spark-fixedwidth-assembly-1.0.jar为此目的下载了但是我无法在任何地方找出 jar。我完全迷失在这里,需要您的建议和帮助。Stackoverflow 中有几篇文章,但它们与 Scala 和 DataFrame API 无关。
这是文件
56 apple TRUE 0.56
45 pear FALSE1.34
34 raspberry TRUE 2.43
34 plum TRUE 1.31
53 cherry TRUE 1.4
23 orange FALSE2.34
56 persimmon FALSE23.2
Run Code Online (Sandbox Code Playgroud)
每列的固定宽度为 3, 10, 5, 4
请提出您的意见。
我有一个文本文件,其中String REC作为记录定界符,而换行符作为列定界符,每个数据都有附加的列名,以逗号作为定界符,以下是示例数据格式
REC
Id,19048
任期,牛奶
等级,1
REC
Id,19049
任期,玉米
等级,5
使用REC作为记录定界符。现在,我想创建具有列名ID,术语和等级的Spark数据框架。
我正在使用 com.databricks.spark.xml 加载一个 xml 文件,我想使用 sql 上下文读取标签属性。
XML :
<Receipt>
<Sale>
<DepartmentID>PR</DepartmentID>
<Tax TaxExempt="false" TaxRate="10.25"/>
</Sale>
</Receipt>
Run Code Online (Sandbox Code Playgroud)
加载文件,
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Receipt").load("/home/user/sale.xml");
df.registerTempTable("SPtable");
Run Code Online (Sandbox Code Playgroud)
打印架构:
root
|-- Sale: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- DepartmentID: long (nullable = true)
| | |-- Tax: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在我想从 Tax.I 中提取标签属性 TaxExempt。我尝试了以下代码,但它给了我错误。
val tax =sqlContext.sql("select Sale.Tax.TaxExempt from SPtable");
Run Code Online (Sandbox Code Playgroud)
错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'Sale.Tax[TaxExempt]' due to data type mismatch: argument 2 requires integral type, however, …Run Code Online (Sandbox Code Playgroud) xml xml-parsing apache-spark apache-spark-sql spark-dataframe
我正在开发一个 Spark 程序,它计算每个用户的概率,从而产生一个相对较大的数据帧(~137.5M 行)。我需要做的是取这些用户的前 10%(10 个是任意的,当然可以更改)并将它们保存到文件中。
一个最小化的例子是:
hc.sparkContext.parallelize(Array(("uid1", "0.5"), ("uid2", "0.7"), ("uid3", "0.3"))).toDF("uuid", "prob")所以我的问题是:使用相对较大的数据框来做到这一点的干净有效的方法是什么?
可以计算输入数据帧的 10% 有多少用户,然后使用 top 和大小。但是,我对此有两个担忧:
如果需要,我不介意使用 RDD 而不是数据帧
我目前使用的是 Spark 1.6.1
提前致谢
我正在尝试将最新的 Spark api 与 SparkSession 一起使用。
当我导入包时,我的 Eclipse 在附件中显示错误。
我正在使用 2.10.6 scala 编译器。
请帮我解决这个问题。

我必须在Spark 2.0中交叉加入2个数据帧我遇到以下错误:
用户类抛出异常:
org.apache.spark.sql.AnalysisException: Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true;
Run Code Online (Sandbox Code Playgroud)
请帮我在哪里设置这个配置,我在eclipse中编码.
我正在尝试计算数据框中每一列的平均值,并从该列中的每个元素中减去。我创建了一个尝试这样做的函数,但是当我尝试使用 UDF 实现它时,出现错误:“float”对象没有属性“map”。关于如何创建这样一个功能的任何想法?谢谢!
def normalize(data):
average=data.map(lambda x: x[0]).sum()/data.count()
out=data.map(lambda x: (x-average))
return out
mapSTD=udf(normalize,IntegerType())
dats = data.withColumn('Normalized', mapSTD('Fare'))
Run Code Online (Sandbox Code Playgroud) 我正在读取一个包含二进制字符串字段的 avro 文件,我需要将其转换为 java.lang.string 以将其传递给另一个库(spark-xml-util),如何将其转换为 java.lang。字符串有效。这是我到目前为止得到的代码:-
val df = sqlContext.read.format("com.databricks.spark.avro").load("filePath/fileName.avro")
df.select("myField").collect().mkString
Run Code Online (Sandbox Code Playgroud)
最后一行给了我以下异常:-
Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:255)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:165)
Run Code Online (Sandbox Code Playgroud)
df 架构是: -
root
|-- id: string (nullable = true)
|-- myField: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud) 我正在使用 Apache spark 中的巨大数据集(包含 332 个字段)与大约 10M 记录的 scala(除了一个字段,其余 331 个可以为空)。但我想用空白字符串(“”)替换 null。由于我有大量字段,实现这一目标的最佳方法是什么?我想在导入此数据集时处理空值,因此在执行转换或导出到 DF 时我会很安全。所以我创建了具有 332 个字段的案例类,处理这些空值的最佳方法是什么?我可以使用 Option(field).getOrElse(""),但我想这不是最好的方法,因为我有大量的字段。谢谢!!
我有一个具有以下类型的数据框:
>>> mydf.printSchema()
root
|-- protocol: string (nullable = true)
|-- source_port: long (nullable = true)
|-- bytes: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)
当我尝试像这样聚合它时:
df_agg = mydf.groupBy('protocol').agg(sum('bytes'))
Run Code Online (Sandbox Code Playgroud)
有人告诉我:
TypeError: unsupported operand type(s) for +: 'int' and 'str'
Run Code Online (Sandbox Code Playgroud)
现在,这对我来说没有意义,因为我看到类型适合聚合,printSchema()如上所示。
因此,我尝试将其转换为整数以防万一:
mydf_converted = mydf.withColumn("converted",mydf["bytes_out"].cast(IntegerType()).alias("bytes_converted"))
Run Code Online (Sandbox Code Playgroud)
但仍然失败:
my_df_agg_converted = mydf_converted.groupBy('protocol').agg(sum('converted'))
TypeError: unsupported operand type(s) for +: 'int' and 'str'
Run Code Online (Sandbox Code Playgroud)
如何解决?我查看了这个问题,但该修复程序根本没有帮助我 - 同样的问题: PySpark DataFrame 上的 Sum 操作在类型正常时给出 TypeError
spark-dataframe ×10
apache-spark ×9
scala ×5
pyspark ×2
hadoop ×1
hadoop2 ×1
performance ×1
python ×1
string ×1
xml ×1
xml-parsing ×1