我刚接触到spark 2.0; 到目前为止,我一直在使用spark 1.6.1.有人可以帮我用pyspark(python)设置sparkSession吗?我知道在线提供的scala示例类似(这里),但我希望能直接使用python语言.
我的具体情况:我在一个zeppelin spark笔记本中加载来自S3的avro文件.然后构建df并运行各种pyspark和sql查询.我所有的旧查询都使用sqlContext.我知道这是不好的做法,但我开始使用我的笔记本
sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().
我可以在avros中阅读
mydata = sqlContext.read.format("com.databricks.spark.avro").load("s3:...
并构建没有问题的数据帧.但是一旦我开始查询dataframes/temp表,我就会收到"java.lang.NullPointerException"错误.我认为这表明存在转换错误(例如,旧查询在1.6.1中工作但需要针对2.0进行调整).无论查询类型如何,都会发生错误.所以我假设
1.)sqlContext别名是个坏主意
和
2.)我需要正确设置sparkSession.
因此,如果有人能告诉我这是如何完成的,或者可能解释他们所知道的不同版本的火花之间的差异,我将非常感激.如果我需要详细说明这个问题,请告诉我.如果它令人费解,我道歉.
如何在我的 py Spark 代码中检查 SparkSession 状态?要求是检查sparkSession是否处于活动状态。如果sparksession未激活,创建另一个spark会话并调用一些函数
我正在 Jupyter 笔记本中编写并运行此代码。
spark = SparkSession.builder.master("yarn") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.config("spark.shuffle.spill.compress", "true")
.config("spark.shuffle.service.enabled", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.kryoserializer.buffer.max", "250m") \
.config("spark.driver.memory", memory) \
.config("spark.driver.cores", cores) \
.config("spark.executor.cores", cores) \
.config("spark.executor.memory", memory) \
.config("spark.executor.instances", cores) \
.enableHiveSupport()\
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
3.
if(spark):
print("yes")
else:
print("no")
Run Code Online (Sandbox Code Playgroud)
打印“是”
spark.stop()它停止了 Spark 应用程序——我签入了 UI
但是当我再次在第三步运行代码时
5.
if(spark):
print("yes")
else:
print("no")
Run Code Online (Sandbox Code Playgroud)
打印“yes”作为输出
error : AttributeError: 'NoneType' object has no attribute 'sc'
Run Code Online (Sandbox Code Playgroud)
df = spark.read.csv(file_name) …Run Code Online (Sandbox Code Playgroud) 我有以下代码片段,我想知道这两者之间有什么区别,我应该使用哪一个?我正在使用spark 2.2.
Dataset<Row> df = sparkSession.readStream()
.format("kafka")
.load();
df.createOrReplaceTempView("table");
df.printSchema();
Dataset<Row> resultSet = df.sqlContext().sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.format("console")
.start();
Run Code Online (Sandbox Code Playgroud)
VS
Dataset<Row> df = sparkSession.readStream()
.format("kafka")
.load();
df.createOrReplaceTempView("table");
Dataset<Row> resultSet = sparkSession.sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
.writeStream()
.trigger(Trigger.ProcessingTime(1000))
.format("console")
.start();
Run Code Online (Sandbox Code Playgroud)