我有一个示例应用程序正在从csv文件读取数据帧.可以使用该方法将数据帧以镶木地板格式存储到Hive表中
df.saveAsTable(tablename,mode).
上面的代码工作正常,但我每天都有如此多的数据,我想根据creationdate(表中的列)动态分区hive表.
有没有办法动态分区数据帧并将其存储到配置单元仓库.想要避免使用硬编码插入语句hivesqlcontext.sql(insert into table partittioin by(date)....).
问题可以视为以下内容的扩展:如何将DataFrame直接保存到Hive?
任何帮助深表感谢.
使用Spark 2.0,我发现有可能将行的数据帧转换为案例类的数据框.当我尝试这样做的时候,我打招呼说要导入spark.implicits._.我遇到的问题是Intellij没有认识到这是一个有效的导入语句,我想知道是否已经移动并且消息没有更新,或者我的构建设置中没有正确的包,这里是我的build.sbt
libraryDependencies ++= Seq(
"org.mongodb.spark" % "mongo-spark-connector_2.11" % "2.0.0-rc0",
"org.apache.spark" % "spark-core_2.11" % "2.0.0",
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
)
Run Code Online (Sandbox Code Playgroud) 我正在尝试利用spark分区.我试图做类似的事情
data.write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.
为了避免我试过
data.coalese(numPart).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.
写入后我应该如何使用分区来避免许多文件?
我有一个包含四个字段的数据框.其中一个字段名称是Status,我试图在.filter中使用OR条件来表示数据帧.我试过下面的查询,但没有运气.
df2 = df1.filter(("Status=2") || ("Status =3"))
df2 = df1.filter("Status=2" || "Status =3")
Run Code Online (Sandbox Code Playgroud)
有没有人以前用过这个.我在这里看到了关于堆栈溢出的类似问题.他们使用下面的代码来使用OR条件.但该代码适用于pyspark.
from pyspark.sql.functions import col
numeric_filtered = df.where(
(col('LOW') != 'null') |
(col('NORMAL') != 'null') |
(col('HIGH') != 'null'))
numeric_filtered.show()
Run Code Online (Sandbox Code Playgroud) 我有以下示例DataFrame:
a | b | c |
1 | 2 | 4 |
0 | null | null|
null | 3 | 4 |
Run Code Online (Sandbox Code Playgroud)
我想只在前2列中替换空值 - 列"a"和"b":
a | b | c |
1 | 2 | 4 |
0 | 0 | null|
0 | 3 | 4 |
Run Code Online (Sandbox Code Playgroud)
以下是创建示例数据帧的代码:
rdd = sc.parallelize([(1,2,4), (0,None,None), (None,3,4)])
df2 = sqlContext.createDataFrame(rdd, ["a", "b", "c"])
Run Code Online (Sandbox Code Playgroud)
我知道如何使用以下方法替换所有空值:
df2 = df2.fillna(0)
Run Code Online (Sandbox Code Playgroud)
当我尝试这个时,我失去了第三列:
df2 = df2.select(df2.columns[0:1]).fillna(0)
Run Code Online (Sandbox Code Playgroud) 我正在使用PySpark,我有一个带有一堆数字列的Spark数据帧.我想添加一个列,它是所有其他列的总和.
假设我的数据框有"a","b"和"c"列.我知道我可以这样做:
df.withColumn('total_col', df.a + df.b + df.c)
Run Code Online (Sandbox Code Playgroud)
问题是我不想单独输出每一列并添加它们,特别是如果我有很多列.我希望能够自动执行此操作,或者通过指定要添加的列名列表.还有另一种方法吗?
在Spark中,如何知道哪些对象在驱动程序上实例化以及哪些对象在执行程序上实例化,因此如何确定哪些类需要实现Serializable?
我试图DataFrame用Parquet格式保存到HDFS,使用DataFrameWriter三列值进行分区,如下所示:
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
Run Code Online (Sandbox Code Playgroud)
正如提到的这个问题,partitionBy将在删除分区的全部现有层次path,并在分区取而代之dataFrame.由于特定日期的新增量数据将定期出现,我想要的是仅替换层次结构中dataFrame具有数据的那些分区,而保持其他分区不变.
要做到这一点,似乎我需要使用其完整路径单独保存每个分区,如下所示:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
Run Code Online (Sandbox Code Playgroud)
但是我无法理解将数据组织到单分区中DataFrame的最佳方法,以便我可以使用它们的完整路径将它们写出来.一个想法是这样的:
dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
Run Code Online (Sandbox Code Playgroud)
但foreachPartition操作上Iterator[Row]是不理想的写出来镶木格式.
我还考虑使用a select...distinct eventdate, hour, processtime获取分区列表,然后按每个分区过滤原始数据帧并将结果保存到完整的分区路径.但是,每个分区的独特查询加过滤器似乎效率不高,因为它会进行大量的过滤/写入操作.
我希望有一种更简洁的方法来保留dataFrame没有数据的现有分区?
谢谢阅读.
Spark版本:2.1
我正在使用pyspark来阅读下面的镶木地板文件:
my_df = sqlContext.read.parquet('hdfs://myPath/myDB.db/myTable/**')
Run Code Online (Sandbox Code Playgroud)
然后,当我这样做时my_df.take(5),它将显示[Row(...)],而不是像我们使用pandas数据帧时的表格格式.
是否可以以pandas数据帧等表格格式显示数据帧?谢谢!
我在python/pyspark中有一个带有列的数据框id time city zip等等......
现在我name在这个数据框中添加了一个新列.
现在,我必须以这样的方式排列列,以便name列出来id
我在下面做了
change_cols = ['id', 'name']
cols = ([col for col in change_cols if col in df]
+ [col for col in df if col not in change_cols])
df = df[cols]
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误
pyspark.sql.utils.AnalysisException: u"Reference 'id' is ambiguous, could be: id#609, id#1224.;"
Run Code Online (Sandbox Code Playgroud)
为什么会出现此错误.我怎样才能纠正这个问题.
spark-dataframe ×10
apache-spark ×8
pyspark ×4
python ×3
bigdata ×1
hadoop ×1
hive ×1
lambda ×1
pandas ×1
parquet ×1
partitioning ×1
rdd ×1
scala ×1