小编pra*_*ads的帖子

熊猫数据框类型datetime64 [ns]在Hive / Athena中不起作用

我正在使用一个python应用程序,它将csv文件转换为hive / athena兼容的拼花格式,并且我正在使用fastparquet和pandas库执行此操作。在csv文件中有时间戳值,例如2018-12-21 23:45:00需要timestamp在镶木地板文件中将其写入类型。以下是我正在运行的代码,

columnNames = ["contentid","processed_time","access_time"]

dtypes = {'contentid': 'str'}

dateCols = ['access_time', 'processed_time']

s3 = boto3.client('s3')

obj = s3.get_object(Bucket=bucketname, Key=keyname)

df = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', header=0, sep=',', quotechar='"', names = columnNames, error_bad_lines=False, dtype=dtypes, parse_dates=dateCols)

s3filesys = s3fs.S3FileSystem()

myopen = s3filesys.open

write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)
Run Code Online (Sandbox Code Playgroud)

代码成功运行,下面是熊猫创建的数据框

contentid                 object
processed_time            datetime64[ns]
access_time               datetime64[ns]
Run Code Online (Sandbox Code Playgroud)

最后,当我在Hive和athena中查询实木复合地板文件时,时间戳记值+50942-11-30 14:00:00.000不是2018-12-21 23:45:00

任何帮助都受到高度赞赏

python hive pandas amazon-athena fastparquet

6
推荐指数
2
解决办法
773
查看次数

依靠DF得出不同的结果

我正在df从XML文件创建一个文件,并与另一个文件进行了一些连接。每当我对联接查询的结果DF进行计数时,它都会给我不同的计数。我不会缓存数据。看起来像是奇怪的火花行为。知道为什么会这样吗?这是做什么的...

val file1 = sqlContext.sql("select * from infile2");
file1.registerTempTable("file1");
val file2 = sqlContext.sql("select * from infile2");
file2.registerTempTable("file2");
val joinedfile = sqlContext.sql("select * from file1 join file2 on file1.id = file2.id");
Run Code Online (Sandbox Code Playgroud)

现在,

joinedfile.count() 
Run Code Online (Sandbox Code Playgroud)

每当我运行它时,都会给出不同的计数。

apache-spark rdd apache-spark-sql spark-dataframe

5
推荐指数
0
解决办法
363
查看次数

Impala不支持自定义SerDe,用带双引号的CSV查询文件的最佳方法是什么?

我有一个CSV数据,每个字段都有双引号。当我创建使用Serde'com.bizo.hive.serde.csv.CSVSerde'的Hive表时,在Impala中查询上面的表时,出现错误SerDe。

我在/ usr / lib / impala / lib文件夹中添加了CSV Serde JAR文件。

后来在Impala文档中研究了Impala不支持自定义SERDE。在这种情况下,我将如何克服这个问题,使带引号的CSV数据得到保护。我想使用CSV Serde,因为它使用值中的逗号(这是合法的字段变量)。

非常感谢

csv hadoop double-quotes impala

4
推荐指数
1
解决办法
5134
查看次数

Pyspark Dataframe 选择少数列上带有别名的所有列

我有一个数据框,其中有很多列(超过 50 列),并且想要选择所有列,因为它们很少通过维护以下顺序重命名列名。我尝试了以下方法,

cols = list(set(df.columns) - {'id','starttime','endtime'})
df.select(col("id").alias("eventid"),col("starttime").alias("eventstarttime"),col("endtime").alias("eventendtime"),*cols,lit(proceessing_time).alias("processingtime"))
Run Code Online (Sandbox Code Playgroud)

并得到错误, SyntaxError: only named arguments may follow *expression

另外,我尝试传递列类型列表,而不是 *cols

df.select(col("id").alias("eventid"),col("starttime").alias("eventstarttime"),col("endtime").alias("eventendtime"),([col(x) for x in cols]),lit(proceessing_time).alias("processingtime"))
Run Code Online (Sandbox Code Playgroud)

这给出了以下错误,

`TypeError: 'Column' object is not callable`
Run Code Online (Sandbox Code Playgroud)

非常感谢任何帮助。

python apache-spark-sql pyspark

4
推荐指数
1
解决办法
2万
查看次数

在hive表中插入的值与csv文件中的字符串使用双引号

我正在将csv文件导出到hive表中.关于csv文件:列值用双引号括起来,用逗号分隔.

来自csv的样本记录

"4","good"
"3","not bad"
"1","very worst"
Run Code Online (Sandbox Code Playgroud)

我用以下语句创建了一个hive表,

创建外部表currys(review_rating字符串,review_comment字符串)行格式字段分隔',';

表创建.

现在我使用命令load data local inpath加载数据并且它成功了.当我查询表格时,

select * from currys;
Run Code Online (Sandbox Code Playgroud)

结果是:

"4"  "good"
"3"  "not bad"
"1"   "very worst"
Run Code Online (Sandbox Code Playgroud)

代替

4  good
3  not bad
1  very worst
Run Code Online (Sandbox Code Playgroud)

记录插入双引号,不应该.

请让我知道如何摆脱这个双重报价..任何帮助或指导是高度赞赏...

先谢谢!

sql csv hadoop hive

3
推荐指数
1
解决办法
7302
查看次数

将行值转换为spark数据帧中的列数组

我正在研究spark数据帧,我需要按列进行分组,并将分组行的列值转换为元素数组作为新列.示例:

Input:

employee | Address
------------------
Micheal  |  NY
Micheal  |  NJ

Output:

employee | Address
------------------
Micheal  | (NY,NJ)
Run Code Online (Sandbox Code Playgroud)

任何帮助都非常感谢.

scala apache-spark spark-dataframe

1
推荐指数
2
解决办法
9949
查看次数

使用 sparkxml 从 xml 中提取标签属性

我正在使用 com.databricks.spark.xml 加载一个 xml 文件,我想使用 sql 上下文读取标签属性。

XML :

<Receipt>
<Sale>
<DepartmentID>PR</DepartmentID>
<Tax TaxExempt="false" TaxRate="10.25"/>
</Sale>
</Receipt>
Run Code Online (Sandbox Code Playgroud)

加载文件,

val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Receipt").load("/home/user/sale.xml");
df.registerTempTable("SPtable");
Run Code Online (Sandbox Code Playgroud)

打印架构:

root
 |-- Sale: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- DepartmentID: long (nullable = true)
 |    |    |-- Tax: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

现在我想从 Tax.I 中提取标签属性 TaxExempt。我尝试了以下代码,但它给了我错误。

val tax =sqlContext.sql("select Sale.Tax.TaxExempt from SPtable");
Run Code Online (Sandbox Code Playgroud)

错误:

org.apache.spark.sql.AnalysisException: cannot resolve 'Sale.Tax[TaxExempt]' due to data type mismatch: argument 2 requires integral type, however, …
Run Code Online (Sandbox Code Playgroud)

xml xml-parsing apache-spark apache-spark-sql spark-dataframe

1
推荐指数
1
解决办法
2702
查看次数