输出为这个并卡在最后一行:
17/09/07 06:01:35 INFO ClientCnxn: Socket connection established to 10.0.0.193/10.0.0.193:2181, initiating session
17/09/07 06:01:35 INFO ClientCnxn: Session establishment complete on server 10.0.0.193/10.0.0.193:2181, sessionid = 0x15e4bc9518103cc, negotiated timeout = 40000
17/09/07 06:01:35 INFO RegionSizeCalculator: **Calculating region sizes for table "event_data".**
17/09/07 06:01:35 INFO SparkContext: Starting job: processCmd at CliDriver.java:376
17/09/07 06:01:36 INFO DAGScheduler: Got job 0 (processCmd at CliDriver.java:376) with 1 output partitions
17/09/07 06:01:36 INFO DAGScheduler: Final stage: ResultStage 0 (processCmd at CliDriver.java:376)
17/09/07 06:01:36 INFO DAGScheduler: Parents of final …Run Code Online (Sandbox Code Playgroud) 我们正在使用Spark Thrift Server对Spark EMR集群运行SQL查询,并且我们看到,当SQL查询(转换为Spark作业)完成时,位于其下方的shuffle文件/mnt/yarn/usercache/root/appcache不会被清除。这No space left on device最终导致在运行几个查询之后。
如果我们停止Spark Thrift Server,则将清理随机文件。有没有什么方法可以使清理不仅在应用程序停止之后运行,而且在每个作业运行之后运行?我们尝试设置以下参数
yarn.nodemanager.localizer.cache.cleanup.interval-ms=6000
yarn.nodemanager.localizer.cache.target-size-mb=1000
Run Code Online (Sandbox Code Playgroud)
但是文件仍未清除。知道为什么会发生以及如何避免它吗?
shuffle amazon-emr hadoop-yarn apache-spark spark-thriftserver
我正在尝试使用提供的Java客户端的google语言API对文本进行分类.代码在Scala中,它看起来像这样:
val language = LanguageServiceClient.create()
val doc = Document.newBuilder.setGcsContentUri(draftBody).setType(Type.PLAIN_TEXT).build
val request = ClassifyTextRequest.newBuilder.setDocument(doc).build
val response = language.classifyText(request)
Run Code Online (Sandbox Code Playgroud)
客户端无法授权,是否有办法使用我创建的API密钥进行授权?
我已经使用管道估计了逻辑回归。
我在拟合逻辑回归之前的最后几行:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="lr_features", labelCol = "targetvar")
# create assember to include encoded features
lr_assembler = VectorAssembler(inputCols= numericColumns +
[categoricalCol + "ClassVec" for categoricalCol in categoricalColumns],
outputCol = "lr_features")
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
# Model definition:
lr = LogisticRegression(featuresCol = "lr_features", labelCol = "targetvar")
# Pipeline definition:
lr_pipeline = Pipeline(stages = indexStages + encodeStages +[lr_assembler, lr])
# Fit the logistic regression model:
lrModel = lr_pipeline.fit(train_train)
Run Code Online (Sandbox Code Playgroud)
然后我尝试运行模型的摘要。但是,下面的代码行:
trainingSummary …Run Code Online (Sandbox Code Playgroud) 我正在UDF中调用API,并尝试将输出记录到Logger中,并出现序列化错误。
以下是我的Logger初始化代码:
log4jLogger = spark._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)
在我的UDF中,我正在使用
LOGGER.info("Message")
Run Code Online (Sandbox Code Playgroud)
但是我得到了错误
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o31.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
Run Code Online (Sandbox Code Playgroud)
注册UDF时
distance_udf = udf(distfunc, DoubleType())
Run Code Online (Sandbox Code Playgroud)
您能否更正我在日志记录中应该修改的内容,以及是否要登录到单独的日志文件中,该如何纠正我。
谢谢
尝试将由 numpy 数组组成的 rdd 转换为 pyspark 中的数据帧时,出现以下错误:
下面是导致这个错误的一段代码,我什至不确定我能得到错误的实际位置,甚至阅读跟踪......
有谁知道如何绕过?
非常感谢 !
In [111]: rddUser.take(5)
Out[111]:
[array([u'1008798262000292538', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'102254941859441333', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1035609083097069747', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'10363297284472000', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32'),
array([u'1059178934871294116', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'],
dtype='<U32')]
Run Code Online (Sandbox Code Playgroud)
那么麻烦来了:
In [110]: rddUser.toDF(schema=None).show()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-110-073037afd70e> in <module>()
----> 1 rddUser.toDF(schema=None).show()
62 [Row(name=u'Alice', age=1)]
63 """
---> 64 return …Run Code Online (Sandbox Code Playgroud) 让我们考虑以下特征:
sealed trait AB
case class A(a: Int) extends AB
case class B(b: Int) extends AB
Run Code Online (Sandbox Code Playgroud)
我试图将collect集合限制为特定的子类.
如果我尝试collect,匹配单个组件并重新组合元组:
scala> Seq.empty[(Int, AB)].collect { case (id, a @ A(_)) => (id, a) } : Seq[(Int, A)]
res6: Seq[(Int, ab.A)] = List()
Run Code Online (Sandbox Code Playgroud)
编译器很高兴,但如果尝试返回完整匹配:
scala> Seq.empty[(Int, AB)].collect { case x @ (_, A(_)) => x } : Seq[(Int, A)]
Run Code Online (Sandbox Code Playgroud)
事情变得丑陋:
<console>:27: error: type mismatch;
found : Seq[(Int, ab.AB)]
required: Seq[(Int, ab.A)]
Seq.empty[(Int, AB)].collect { case x @ (_, A(_)) => …Run Code Online (Sandbox Code Playgroud) scala type-inference pattern-matching scala-collections scala-compiler
我正在尝试将 pyspark 与 python 2.7(Pycharm IDE)集成。我需要运行一些巨大的文本文件。
所以这就是我正在做的。
下载 Spark (2.3.0-bin-hadoop-2.7) 并解压 安装 JDK
然后我试图运行这个脚本
spark_home = os.environ.get('SPARK_HOME', None) os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7" import pyspark from pyspark import SparkContext, SparkConf from pyspark。 sql 导入 SparkSession
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
import pandas as pd
ip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"D:\some file.csv")
Run Code Online (Sandbox Code Playgroud)
Pycharm 说没有找到名为 Pyspark 的模块。
我正在通过添加内容根并指向安装它的文件夹来解决这个问题。
但问题是每次我重新打开 pycharm 时,我都必须添加内容根。我该如何解决?
接下来是,当我设法运行脚本时,它会引发以下错误。
2018-06-01 12:20:49 ERROR Shell:397 - Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable …Run Code Online (Sandbox Code Playgroud) 通过提交火花,我在Kubernetes集群上启动了应用程序。而且只有在访问http:// driver-pod:port时,我才能看到Spark-UI 。
如何在集群上启动Spark-UI History Server?如何使所有正在运行的Spark作业都在Spark-UI历史记录服务器上注册。
这可能吗?
我正在尝试以下 Java 示例
这是我的代码
public class App {
public static void main(String[] args) {
System.out.println("Hello World!");
System.setProperty("hadoop.home.dir", "D:\\del");
List<MyRecord> firstRow = new ArrayList<MyRecord>();
firstRow.add(new App().new MyRecord("1", "Love is blind"));
List<MyRecord> secondRow = new ArrayList<MyRecord>();
secondRow.add(new App().new MyRecord("1", "Luv is blind"));
SparkSession spark = SparkSession.builder().appName("LSHExample").config("spark.master", "local")
.getOrCreate();
Dataset firstDataFrame = spark.createDataFrame(firstRow, MyRecord.class);
Dataset secondDataFrame = spark.createDataFrame(secondRow, MyRecord.class);
firstDataFrame.show(20, false);
secondDataFrame.show(20, false);
RegexTokenizer regexTokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")
.setPattern("\\W");
NGram ngramTransformer = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams");
HashingTF hashingTF = new HashingTF().setInputCol("ngrams").setOutputCol("vectors");
MinHashLSH …Run Code Online (Sandbox Code Playgroud) apache-spark ×8
pyspark ×4
python ×4
scala ×2
amazon-emr ×1
hadoop-yarn ×1
java ×1
kubernetes ×1
logging ×1
numpy ×1
pipeline ×1
rdd ×1
shuffle ×1