当我编写 Pyspark 代码来连接 Snowflake 时遇到错误

xmz*_*xmz 5 apache-spark-sql pyspark snowflake-cloud-data-platform

当我尝试从 Jupiter Notebook 编写 PySpark 代码以连接 Snowflake 时,出现错误。这是我得到的错误:

Py4JJavaError:调用 o526.load 时发生错误。:java.lang.ClassNotFoundException:找不到数据源:net.snowflake.spark.snowflake。请在http://spark.apache.org/third-party-projects.html找到软件包

Spark-版本:v2.4.5 主控:本地[*] Python 3.X

这是我的代码:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

sc = SparkContext.getOrCreate()

spark = SparkSession.builder \
    .master("local") \
    .appName("Test") \
    .config('spark.jars','/Users/zhao/Downloads/snowflake-jdbc-3.5.4.jar,/Users/zhao/Downloads/spark-snowflake_2.11-2.3.2.jar') \
    .getOrCreate()

sfOptions = {
  "sfURL" : "xxx",
  "sfUser" : "xxx",
  "sfPassword" : "xxx",
  "sfDatabase" : "xxx",
  "sfSchema" : "xxx",
  "sfWarehouse" : "xxx",
  "sfRole": "xxx"
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select * from CustomerInfo limit 10") \
  .load()
Run Code Online (Sandbox Code Playgroud)

如果有人能给我一些想法,我将不胜感激:)

小智 2

您如何启动 jupyter 笔记本服务器实例?您是否确保您的PYTHONPATHSPARK_HOME变量已正确设置,并且 Spark 没有预运行实例?另外,您的 Snowflake Spark Connector jar 是否使用正确的 Spark 和 Scala 版本变体?

这是在 macOS 计算机上完全引导和测试的运行,作为参考(使用homebrew):

# Install JDK8
~> brew tap adoptopenjdk/openjdk
~> brew cask install adoptopenjdk8

# Install Apache Spark (v2.4.5 as of post date)
~> brew install apache-spark

# Install Jupyter Notebooks (incl. optional CLI notebooks)
~> pip3 install --user jupyter notebook

# Ensure we use JDK8 (using very recent JDKs will cause class version issues)
~> export JAVA_HOME="/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home"

# Setup environment to allow discovery of PySpark libs and the Spark binaries
# (Uses homebrew features to set the paths dynamically)
~> export SPARK_HOME="$(brew --prefix apache-spark)/libexec"
~> export PYTHONPATH="${SPARK_HOME}/python:${SPARK_HOME}/python/build:${PYTHONPATH}"
~> export PYTHONPATH="$(brew list apache-spark | grep 'py4j-.*-src.zip$' | head -1):${PYTHONPATH}"

# Download jars for dependencies in notebook code into /tmp

# Snowflake JDBC (v3.12.8 used here):
~> curl --silent --location \
'https://search.maven.org/classic/remotecontent?filepath=net/snowflake/snowflake-jdbc/3.12.8/snowflake-jdbc-3.12.8.jar' \
> /tmp/snowflake-jdbc-3.12.8.jar

# Snowflake Spark Connector (v2.7.2 used here)
# But more importantly, a Scala 2.11 and Spark 2.4.x compliant one is fetched
~> curl --silent --location \
'https://search.maven.org/classic/remotecontent?filepath=net/snowflake/spark-snowflake_2.11/2.7.2-spark_2.4/spark-snowflake_2.11-2.7.2-spark_2.4.jar' \
> /tmp/spark-snowflake_2.11-2.7.2-spark_2.4.jar

# Run the jupyter notebook service (opens up in webbrowser)
~> jupyter notebook
Run Code Online (Sandbox Code Playgroud)

代码在新Python 3笔记本中运行:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

sfOptions = {
    "sfURL": "account.region.snowflakecomputing.com",
    "sfUser": "username",
    "sfPassword": "password",
    "sfDatabase": "db_name",
    "sfSchema": "schema_name",
    "sfWarehouse": "warehouse_name",
    "sfRole": "role_name",
}

spark = SparkSession.builder \
    .master("local") \
    .appName("Test") \
    .config('spark.jars','/tmp/snowflake-jdbc-3.12.8.jar,/tmp/spark-snowflake_2.11-2.7.2-spark_2.4.jar') \
    .getOrCreate()

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query",  "select * from CustomerInfo limit 10") \
    .load()

df.show()
Run Code Online (Sandbox Code Playgroud)

上面的示例使用读取方法(将数据从 Snowflake 移动到 Spark),但如果您想编写数据帧,请参阅有关将数据从 Spark 移动到 Snowflake 的文档。