Rah*_*dar 3 python java python-2.7 apache-spark pyspark
因此,我尝试使用以下命令在 Python 2.7 中创建 Spark 会话:
#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
#Create a Spark Session
SpSession = SparkSession \
.builder \
.master("local[2]") \
.appName("V2 Maestros") \
.config("spark.executor.memory", "1g") \
.config("spark.cores.max","2") \
.config("spark.sql.warehouse.dir", "file:///c:/temp/spark-warehouse")\
.getOrCreate()
#Get the Spark Context from Spark Session
SpContext = SpSession.sparkContext
Run Code Online (Sandbox Code Playgroud)
我收到以下指向python\lib\pyspark.zip\pyspark\java_gateway.py路径的错误
Exception: Java gateway process exited before sending the driver its port number
Run Code Online (Sandbox Code Playgroud)
试图查看 java_gateway.py 文件,内容如下:
import atexit
import os
import sys
import select
import signal
import shlex
import socket
import platform
from subprocess import Popen, PIPE
if sys.version >= '3':
xrange = range
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
from py4j.java_collections import ListConverter
from pyspark.serializers import read_int
# patching ListConverter, or it will convert bytearray into Java ArrayList
def can_convert_list(self, obj):
return isinstance(obj, (list, tuple, xrange))
ListConverter.can_convert = can_convert_list
def launch_gateway():
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
SPARK_HOME = os.environ["SPARK_HOME"]
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
if os.environ.get("SPARK_TESTING"):
submit_args = ' '.join([
"--conf spark.ui.enabled=false",
submit_args
])
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
# Start a socket that will be used by PythonGatewayServer to communicate its port to us
callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
callback_socket.bind(('127.0.0.1', 0))
callback_socket.listen(1)
callback_host, callback_port = callback_socket.getsockname()
env = dict(os.environ)
env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host
env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)
# Launch the Java gateway.
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdin=PIPE, env=env)
gateway_port = None
# We use select() here in order to avoid blocking indefinitely if the subprocess dies
# before connecting
while gateway_port is None and proc.poll() is None:
timeout = 1 # (seconds)
readable, _, _ = select.select([callback_socket], [], [], timeout)
if callback_socket in readable:
gateway_connection = callback_socket.accept()[0]
# Determine which ephemeral port the server started on:
gateway_port = read_int(gateway_connection.makefile(mode="rb"))
gateway_connection.close()
callback_socket.close()
if gateway_port is None:
raise Exception("Java gateway process exited before sending the driver its port number")
# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
atexit.register(killChild)
# Connect to the gateway
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True)
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
Run Code Online (Sandbox Code Playgroud)
我对 Spark 和 Pyspark 很陌生,因此无法在此处调试问题。我还尝试查看其他一些建议: Spark + Python - 在向驱动程序发送其端口号之前退出了 Java 网关进程? 和 Pyspark:异常:Java 网关进程在向驱动程序发送其端口号之前退出
但目前无法解决这个问题。请帮忙!
以下是 spark 环境的样子:
# This script loads spark-env.sh if it exists, and ensures it is only loaded once.
# spark-env.sh is loaded from SPARK_CONF_DIR if set, or within the current directory's
# conf/ subdirectory.
# Figure out where Spark is installed
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1
# Returns the parent of the directory this script lives in.
parent_dir="${SPARK_HOME}"
user_conf_dir="${SPARK_CONF_DIR:-"$parent_dir"/conf}"
if [ -f "${user_conf_dir}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${user_conf_dir}/spark-env.sh"
set +a
fi
fi
# Setting SPARK_SCALA_VERSION if not already set.
if [ -z "$SPARK_SCALA_VERSION" ]; then
ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.10"
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
exit 1
fi
if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.10"
fi
fi
Run Code Online (Sandbox Code Playgroud)
以下是我在 Python 中设置 Spark 环境的方式:
import os
import sys
# NOTE: Please change the folder paths to your current setup.
#Windows
if sys.platform.startswith('win'):
#Where you downloaded the resource bundle
os.chdir("E:/Udemy - Spark/SparkPythonDoBigDataAnalytics-Resources")
#Where you installed spark.
os.environ['SPARK_HOME'] = 'E:/Udemy - Spark/Apache Spark/spark-2.0.0-bin-hadoop2.7'
#other platforms - linux/mac
else:
os.chdir("/Users/kponnambalam/Dropbox/V2Maestros/Modules/Apache Spark/Python")
os.environ['SPARK_HOME'] = '/users/kponnambalam/products/spark-2.0.0-bin-hadoop2.7'
os.curdir
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.1-src.zip"))
#Initialize SparkSession and SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
Run Code Online (Sandbox Code Playgroud)
在阅读了许多帖子后,我终于让 Spark 在我的 Windows 笔记本电脑上运行了。我使用 Anaconda Python,但我相信这也适用于标准发行版。
因此,您需要确保可以独立运行 Spark。我的假设是您已经安装了有效的 python 路径和 Java。对于 Java,我在我的路径中定义了“C:\ProgramData\Oracle\Java\javapath”,它重定向到我的 Java8 bin 文件夹。
转到 %SPARK_HOME%\bin 并尝试运行 Python Spark shell pyspark。如果您的环境和我的一样,您会看到无法找到 winutils 和 hadoop 的例外情况。第二个例外是关于缺少 Hive:
pyspark.sql.utils.IllegalArgumentException: u"实例化 'org.apache.spark.sql.hive.HiveSessionStateBuilder' 时出错:"
然后我发现并简单地关注了https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-tips-and-tricks-running-spark-windows.html 具体来说:
winutils.exe chmod -R 777 C:\tmp\hive以管理员模式在 cmd 中运行。
请注意,sc spark 上下文变量需要已经定义。希望有所帮助,您可以享受在本地运行 Spark 代码的乐趣。
根据我的“猜测”,这是你的 java 版本的问题。也许您安装了两个不同的 java 版本。另外,您似乎正在使用从某处复制并粘贴的代码来设置等SPARK_HOME。有很多如何设置 Spark 的简单示例。而且看起来您正在使用 Windows。我建议使用 *NIX 环境来测试,因为这更容易,例如您可以使用它brew来安装 Spark。Windows 并不是专门为此而设计的...