我正在使用一个python应用程序,它将csv文件转换为hive / athena兼容的拼花格式,并且我正在使用fastparquet和pandas库执行此操作。在csv文件中有时间戳值,例如2018-12-21 23:45:00需要timestamp在镶木地板文件中将其写入类型。以下是我正在运行的代码,
columnNames = ["contentid","processed_time","access_time"]
dtypes = {'contentid': 'str'}
dateCols = ['access_time', 'processed_time']
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=bucketname, Key=keyname)
df = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', header=0, sep=',', quotechar='"', names = columnNames, error_bad_lines=False, dtype=dtypes, parse_dates=dateCols)
s3filesys = s3fs.S3FileSystem()
myopen = s3filesys.open
write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)
Run Code Online (Sandbox Code Playgroud)
代码成功运行,下面是熊猫创建的数据框
contentid object
processed_time datetime64[ns]
access_time datetime64[ns]
Run Code Online (Sandbox Code Playgroud)
最后,当我在Hive和athena中查询实木复合地板文件时,时间戳记值+50942-11-30 14:00:00.000不是2018-12-21 23:45:00
任何帮助都受到高度赞赏
我正在df从XML文件创建一个文件,并与另一个文件进行了一些连接。每当我对联接查询的结果DF进行计数时,它都会给我不同的计数。我不会缓存数据。看起来像是奇怪的火花行为。知道为什么会这样吗?这是做什么的...
val file1 = sqlContext.sql("select * from infile2");
file1.registerTempTable("file1");
val file2 = sqlContext.sql("select * from infile2");
file2.registerTempTable("file2");
val joinedfile = sqlContext.sql("select * from file1 join file2 on file1.id = file2.id");
Run Code Online (Sandbox Code Playgroud)
现在,
joinedfile.count()
Run Code Online (Sandbox Code Playgroud)
每当我运行它时,都会给出不同的计数。
我有一个CSV数据,每个字段都有双引号。当我创建使用Serde'com.bizo.hive.serde.csv.CSVSerde'的Hive表时,在Impala中查询上面的表时,出现错误SerDe。
我在/ usr / lib / impala / lib文件夹中添加了CSV Serde JAR文件。
后来在Impala文档中研究了Impala不支持自定义SERDE。在这种情况下,我将如何克服这个问题,使带引号的CSV数据得到保护。我想使用CSV Serde,因为它使用值中的逗号(这是合法的字段变量)。
非常感谢
我有一个数据框,其中有很多列(超过 50 列),并且想要选择所有列,因为它们很少通过维护以下顺序重命名列名。我尝试了以下方法,
cols = list(set(df.columns) - {'id','starttime','endtime'})
df.select(col("id").alias("eventid"),col("starttime").alias("eventstarttime"),col("endtime").alias("eventendtime"),*cols,lit(proceessing_time).alias("processingtime"))
Run Code Online (Sandbox Code Playgroud)
并得到错误,
SyntaxError: only named arguments may follow *expression
另外,我尝试传递列类型列表,而不是 *cols
df.select(col("id").alias("eventid"),col("starttime").alias("eventstarttime"),col("endtime").alias("eventendtime"),([col(x) for x in cols]),lit(proceessing_time).alias("processingtime"))
Run Code Online (Sandbox Code Playgroud)
这给出了以下错误,
`TypeError: 'Column' object is not callable`
Run Code Online (Sandbox Code Playgroud)
非常感谢任何帮助。
我正在将csv文件导出到hive表中.关于csv文件:列值用双引号括起来,用逗号分隔.
来自csv的样本记录
"4","good"
"3","not bad"
"1","very worst"
Run Code Online (Sandbox Code Playgroud)
我用以下语句创建了一个hive表,
创建外部表currys(review_rating字符串,review_comment字符串)行格式字段分隔',';
表创建.
现在我使用命令load data local inpath加载数据并且它成功了.当我查询表格时,
select * from currys;
Run Code Online (Sandbox Code Playgroud)
结果是:
"4" "good"
"3" "not bad"
"1" "very worst"
Run Code Online (Sandbox Code Playgroud)
代替
4 good
3 not bad
1 very worst
Run Code Online (Sandbox Code Playgroud)
记录插入双引号,不应该.
请让我知道如何摆脱这个双重报价..任何帮助或指导是高度赞赏...
先谢谢!
我正在研究spark数据帧,我需要按列进行分组,并将分组行的列值转换为元素数组作为新列.示例:
Input:
employee | Address
------------------
Micheal | NY
Micheal | NJ
Output:
employee | Address
------------------
Micheal | (NY,NJ)
Run Code Online (Sandbox Code Playgroud)
任何帮助都非常感谢.
我正在使用 com.databricks.spark.xml 加载一个 xml 文件,我想使用 sql 上下文读取标签属性。
XML :
<Receipt>
<Sale>
<DepartmentID>PR</DepartmentID>
<Tax TaxExempt="false" TaxRate="10.25"/>
</Sale>
</Receipt>
Run Code Online (Sandbox Code Playgroud)
加载文件,
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag","Receipt").load("/home/user/sale.xml");
df.registerTempTable("SPtable");
Run Code Online (Sandbox Code Playgroud)
打印架构:
root
|-- Sale: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- DepartmentID: long (nullable = true)
| | |-- Tax: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在我想从 Tax.I 中提取标签属性 TaxExempt。我尝试了以下代码,但它给了我错误。
val tax =sqlContext.sql("select Sale.Tax.TaxExempt from SPtable");
Run Code Online (Sandbox Code Playgroud)
错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'Sale.Tax[TaxExempt]' due to data type mismatch: argument 2 requires integral type, however, …Run Code Online (Sandbox Code Playgroud) xml xml-parsing apache-spark apache-spark-sql spark-dataframe
apache-spark ×3
csv ×2
hadoop ×2
hive ×2
python ×2
fastparquet ×1
impala ×1
pandas ×1
pyspark ×1
rdd ×1
scala ×1
sql ×1
xml ×1
xml-parsing ×1