相关疑难解决方法(0)

如何使用pyspark在Spark 2.0中构建sparkSession?

我刚接触到spark 2.0; 到目前为止,我一直在使用spark 1.6.1.有人可以帮我用pyspark(python)设置sparkSession吗?我知道在线提供的scala示例类似(这里),但我希望能直接使用python语言.

我的具体情况:我在一个zeppelin spark笔记本中加载来自S3的avro文件.然后构建df并运行各种pyspark和sql查询.我所有的旧查询都使用sqlContext.我知道这是不好的做法,但我开始使用我的笔记本

sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().

我可以在avros中阅读

mydata = sqlContext.read.format("com.databricks.spark.avro").load("s3:...

并构建没有问题的数据帧.但是一旦我开始查询dataframes/temp表,我就会收到"java.lang.NullPointerException"错误.我认为这表明存在转换错误(例如,旧查询在1.6.1中工作但需要针对2.0进行调整).无论查询类型如何,都会发生错误.所以我假设

1.)sqlContext别名是个坏主意

2.)我需要正确设置sparkSession.

因此,如果有人能告诉我这是如何完成的,或者可能解释他们所知道的不同版本的火花之间的差异,我将非常感激.如果我需要详细说明这个问题,请告诉我.如果它令人费解,我道歉.

python sql apache-spark pyspark

25
推荐指数
3
解决办法
6万
查看次数

Spark会话状态

如何在我的 py Spark 代码中检查 SparkSession 状态?要求是检查sparkSession是否处于活动状态。如果sparksession未激活,创建另一个spark会话并调用一些函数

我正在 Jupyter 笔记本中编写并运行此代码。

spark = SparkSession.builder.master("yarn") \                                 
    .config("spark.dynamicAllocation.enabled", "true") \
                                    .config("spark.serializer", 
    "org.apache.spark.serializer.KryoSerializer")
    .config("spark.shuffle.spill.compress", "true")
    .config("spark.shuffle.service.enabled", "true")
    .config("spark.io.compression.codec", "snappy")
    .config("spark.kryoserializer.buffer.max", "250m") \
    .config("spark.driver.memory", memory) \
    .config("spark.driver.cores", cores) \
    .config("spark.executor.cores", cores) \
    .config("spark.executor.memory", memory) \
    .config("spark.executor.instances", cores) \
    .enableHiveSupport()\
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)
  1. Spark 打印 SparkSession 详细信息

3.

if(spark):
    print("yes")

else:
    print("no")
Run Code Online (Sandbox Code Playgroud)

打印“是”

  1. spark.stop()

它停止了 Spark 应用程序——我签入了 UI

但是当我再次在第三步运行代码时

5.

if(spark):
    print("yes")

else:
    print("no")
Run Code Online (Sandbox Code Playgroud)

打印“yes”作为输出

  1. 但它确实会产生火花
error : AttributeError: 'NoneType' object has no attribute 'sc'
Run Code Online (Sandbox Code Playgroud)
  1. 但是当我运行下一个命令时我看到了奇怪的事情
df = spark.read.csv(file_name) …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

7
推荐指数
1
解决办法
7268
查看次数

SparkSession.sql和Dataset.sqlContext.sql有什么区别?

我有以下代码片段,我想知道这两者之间有什么区别,我应该使用哪一个?我正在使用spark 2.2.

Dataset<Row> df = sparkSession.readStream()
    .format("kafka")
    .load();

df.createOrReplaceTempView("table");
df.printSchema();

Dataset<Row> resultSet =  df.sqlContext().sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
        .writeStream()
        .trigger(Trigger.ProcessingTime(1000))
        .format("console")
        .start();
Run Code Online (Sandbox Code Playgroud)

VS

Dataset<Row> df = sparkSession.readStream()
    .format("kafka")
    .load();

df.createOrReplaceTempView("table");

Dataset<Row> resultSet =  sparkSession.sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
        .writeStream()
        .trigger(Trigger.ProcessingTime(1000))
        .format("console")
        .start();
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

3
推荐指数
1
解决办法
4186
查看次数

标签 统计

apache-spark ×3

pyspark ×2

python ×2

apache-spark-sql ×1

sql ×1