我试图从DataFrame到DataFrame编写一个转换方法.而且我也想通过scalatest进行测试.
如您所知,在使用Scala API的Spark 2.x中,您可以按如下方式创建SparkSession对象:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.bulider
.config("spark.master", "local[2]")
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
此代码适用于单元测试.但是,当我使用spark-submit运行此代码时,群集选项不起作用.例如,
spark-submit --master yarn --deploy-mode client --num-executors 10 ...
Run Code Online (Sandbox Code Playgroud)
不会创建任何执行者.
我发现当我删除config("master", "local[2]")上面的部分代码时会应用spark-submit参数.但是,没有主设置,单元测试代码不起作用.
我试图将spark(SparkSession)对象生成部分拆分为test和main.但是,有很多代码块需要火花,例如import spark.implicit,_和spark.createDataFrame(rdd, schema).
有没有最好的做法来编写代码来创建spark对象来测试和运行spark-submit?
我有一个 test.py 文件
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.externals import joblib
import tqdm
import time
print("Successful import")
Run Code Online (Sandbox Code Playgroud)
我按照这种方法创建了所有依赖项的独立 zip
pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .
Run Code Online (Sandbox Code Playgroud)
它创建了这个树结构(dependency.zip)
dependencies.zip
->pandas
->numpy
->........
Run Code Online (Sandbox Code Playgroud)
当我跑步时
spark-submit --py-files /home/ion/Documents/dependencies.zip /home/ion/Documents/sentiment_analysis/test.py
Run Code Online (Sandbox Code Playgroud)
我收到以下错误
2018-05-16 07:36:21 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
File "/home/ion/Documents/sentiment_analysis/test.py", line 2, in <module>
from …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 Spark-submit 将我的 Pyspark 应用程序提交到 Kubernetes 集群 (Minikube):
./bin/spark-submit \
--master k8s://https://192.168.64.4:8443 \
--deploy-mode cluster \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 \
--conf spark.kubernetes.container.image='pyspark:dev' \
--conf spark.kubernetes.container.image.pullPolicy='Never' \
local:///main.py
Run Code Online (Sandbox Code Playgroud)
应用程序尝试访问部署在集群内的 Kafka 实例,因此我指定了 jar 依赖项:
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
Run Code Online (Sandbox Code Playgroud)
我正在使用的容器映像基于我使用实用程序脚本构建的容器映像。我已经将我的应用程序所需的所有 python 依赖项打包在其中。
驱动程序正确部署并获取 Kafka 包(如果需要,我可以提供日志)并在新的 pod 中启动执行器。
但随后执行器 Pod 崩溃了:
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaBatchInputPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) …Run Code Online (Sandbox Code Playgroud) 要将Spark应用程序提交到集群,他们的文档说明:
为此,请创建包含代码及其依赖项的程序集jar(或"uber"jar).sbt和Maven都有汇编插件.在创建程序集jar时,将Spark和Hadoop列为提供的依赖项; 这些不需要捆绑,因为它们是由集群管理器在运行时提供的.- http://spark.apache.org/docs/latest/submitting-applications.html
所以,我在我的pom.xml文件中添加了Apache Maven Shade插件.(版本3.0.0)
我将Spark依赖项的范围转换为provided.(版本2.1.0)
(我还添加了Apache Maven Assembly Plugin,以确保我在运行时将所有依赖项都包装在jar中mvn clean package.我不确定它是否真的有必要.)
这样就spark-submit失败了.它抛出一个NoSuchMethodError依赖我的(请注意,代码在IntelliJ内部编译时从本地实例工作,假设provided已删除).
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;
Run Code Online (Sandbox Code Playgroud)
抛出错误的代码行是无关紧要的 - 它只是我的main方法中创建Stopwatch一部分Google Guava实用程序的第一行.(版本21.0)
在线的其他解决方案表明它与Guava的版本冲突有关,但我对这些建议还没有任何运气.任何帮助将不胜感激,谢谢.
java nosuchmethoderror guava maven-shade-plugin spark-submit
~/spark/spark-2.1.1-bin-hadoop2.7/bin$ ./spark-submit --master spark://192.168.42.80:32141 --deploy-mode cluster file:///home/me/workspace/myproj/target/scala-2.11/myproj-assembly-0.1.0.jar
Running Spark using the REST application submission protocol.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/20 16:41:30 INFO RestSubmissionClient: Submitting a request to launch an application in spark://192.168.42.80:32141.
17/06/20 16:41:31 INFO RestSubmissionClient: Submission successfully created as driver-20170620204130-0005. Polling submission state...
17/06/20 16:41:31 INFO RestSubmissionClient: Submitting a request for the status of submission driver-20170620204130-0005 in spark://192.168.42.80:32141.
17/06/20 16:41:31 INFO RestSubmissionClient: State of driver driver-20170620204130-0005 is now ERROR.
17/06/20 16:41:31 INFO RestSubmissionClient: Driver is running on …Run Code Online (Sandbox Code Playgroud) 我使用spark从elasticsearch读取。
select col from index limit 10;
Run Code Online (Sandbox Code Playgroud)
问题在于索引非常大,包含1000亿行,而spark会生成数千个任务来完成工作。
我只需要10行,即使1个任务也可以返回10行就可以完成工作了,我不需要那么多任务。
极限甚至是极限1都很慢。
代码?
sql = select col from index limit 10
sqlExecListener.sparkSession.sql(sql).createOrReplaceTempView(tempTable)
Run Code Online (Sandbox Code Playgroud) 我正在使用 Java 在 Spark 中运行以下代码。
代码
测试.java
package com.sample;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.storage.StorageLevel;
import com.addition.AddTwoNumbers;
public class Test{
private static final String APP_NAME = "Test";
private static final String LOCAL = "local";
private static final String MASTER_IP = "spark://10.180.181.26:7077";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(APP_NAME).setMaster(MASTER_IP);
String connection = "jdbc:oracle:thin:test/test@//xyz00aie.in.oracle.com:1521/PDX2600N";
// Create Spark Context
SparkContext context = new SparkContext(conf);
// Create Spark Session
SparkSession sparkSession = …Run Code Online (Sandbox Code Playgroud) 我在集群中运行 Spark(远程)
如何使用 spark-submit 将应用程序提交到具有以下场景的远程集群:
spark-submit 通过骆驼作为命令执行
应用程序在它自己的容器中运行。
从以下链接:
https://github.com/mvillarrealb/docker-spark-cluster
https://github.com/big-data-europe/docker-spark
我们可以提交 spark 应用程序,但我们已将文件和 jar 复制到卷中。
我如何避免这种情况?
有什么办法吗?
我有 4 个 python 脚本和 1 个 .txt 配置文件。在 4 个 Python 文件中,其中一个文件具有 Spark 应用程序的入口点,并且还从其他 Python 文件导入函数。但配置文件是在其他一些 python 文件中导入的,该文件不是 Spark 应用程序的入口点。我想在 pyspark 中编写 Spark Submit 命令,但是当配置文件不是 python 文件而是文本文件或 ini 文件时,我不确定如何使用 Spark Submit 命令沿配置文件提供多个文件。
用于演示:4 个 python 文件: file1.py 、 file2.py 、 file3.py 。文件4.py
1个配置文件:conf.txt
file1.py:这个文件有spark会话并调用所有其他python文件。file3.py:这个python文件正在读取conf.txt。
我想通过 Spark Submit 提供所有这些文件,但不确定 command 。我确定的一种解决方案是:
spark-submit --master local --driver-memory 2g --executor-memory 2g --py-files s3_path\file2.py,s3_path\file3.py,s3_path\file4.py s3_path\file1.py
Run Code Online (Sandbox Code Playgroud)
但对于上面的 Spark Submit 我不知道如何传递 conf.txt 。
我一整天都在与它斗争。我能够安装并使用带有 Spark shell 或连接的 Jupiter 笔记本的包(graphframes),但我想使用 Spark-Submit 将其移动到基于 kubernetes 的 Spark 环境。我的spark版本:3.0.1 我从spark-packages下载了最后一个可用的.jar文件(graphframes-0.8.1-spark3.0-s_2.12.jar)并将其放入jars文件夹中。我使用标准 Spark docker 文件的变体来构建我的图像。我的 Spark-submit 命令如下所示:
$SPARK_HOME/bin/spark-submit \
--master k8s://https://kubernetes.docker.internal:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=$2 \
--conf spark.kubernetes.container.image=myimage.io/repositorypath \
--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 \
--jars "local:///opt/spark/jars/graphframes-0.8.1-spark3.0-s_2.12.jar" \
path/to/my/script/script.py
Run Code Online (Sandbox Code Playgroud)
但它以错误结束:
Ivy Default Cache set to: /opt/spark/.ivy2/cache
The jars for the packages stored in: /opt/spark/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e833e157-44f5-4055-81a4-3ab524176ef5;1.0
confs: [default]
Exception in …Run Code Online (Sandbox Code Playgroud) spark-submit ×10
apache-spark ×9
pyspark ×4
python ×3
kubernetes ×2
scala ×2
apache-camel ×1
docker ×1
graphframes ×1
guava ×1
ivy ×1
java ×1