Mik*_*kov 7 apache-spark pyspark
我有一个pyspark从TSV文件加载数据并将其保存为镶木地板文件,并将其保存为持久SQL表.
当我通过pyspark CLI逐行运行它时,它的工作方式与预期的完全相同.当我像使用spark-submit的应用程序一样运行时,它运行没有任何错误,但我得到奇怪的结果:1.数据被覆盖而不是附加.2.当我对它运行SQL查询时,即使镶木地板文件的大小为几千兆字节(我所期望的),我也没有返回任何数据.有什么建议?
码:
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *
csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'
sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)
fields = [StructField('time_stamp', TimestampType(), True),
StructField('subscriberId', StringType(), True),
StructField('sourceIPv4Address', StringType(), True),
StructField('destinationIPv4Address', StringType(), True),
StructField('service',StringType(), True),
StructField('baseService',StringType(), True),
StructField('serverHostname', StringType(), True),
StructField('rat', StringType(), True),
StructField('userAgent', StringType(), True),
StructField('accessPoint', StringType(), True),
StructField('station', StringType(), True),
StructField('device', StringType(), True),
StructField('contentCategories', StringType(), True),
StructField('incomingOctets', LongType(), True),
StructField('outgoingOctets', LongType(), True),
StructField('incomingShapingDrops', IntegerType(), True),
StructField('outgoingShapingDrops', IntegerType(), True),
StructField('qoeIncomingInternal', DoubleType(), True),
StructField('qoeIncomingExternal', DoubleType(), True),
StructField('qoeOutgoingInternal', DoubleType(), True),
StructField('qoeOutgoingExternal', DoubleType(), True),
StructField('incomingShapingLatency', DoubleType(), True),
StructField('outgoingShapingLatency', DoubleType(), True),
StructField('internalRtt', DoubleType(), True),
StructField('externalRtt', DoubleType(), True),
StructField('HttpUrl',StringType(), True)]
schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)
Run Code Online (Sandbox Code Playgroud)
正如 @user8371915 所建议的,它与此类似:
Spark可以从pyspark访问Hive表,但不能从spark-submit访问Hive表
我需要更换
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
Run Code Online (Sandbox Code Playgroud)
和
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
Run Code Online (Sandbox Code Playgroud)
这解决了这个问题。
| 归档时间: |
|
| 查看次数: |
219 次 |
| 最近记录: |