小编hi-*_*zir的帖子

spark-sql 卡在 BlockManagerInfo:计算 sql 语句时添加了 broadcast_0_piece0

输出为这个并卡在最后一行:

17/09/07 06:01:35 INFO ClientCnxn: Socket connection established to 10.0.0.193/10.0.0.193:2181, initiating session  
17/09/07 06:01:35 INFO ClientCnxn: Session establishment complete on server 10.0.0.193/10.0.0.193:2181, sessionid = 0x15e4bc9518103cc, negotiated timeout = 40000  
17/09/07 06:01:35 INFO RegionSizeCalculator: **Calculating region sizes for table "event_data".**  
17/09/07 06:01:35 INFO SparkContext: Starting job: processCmd at CliDriver.java:376  
17/09/07 06:01:36 INFO DAGScheduler: Got job 0 (processCmd at CliDriver.java:376) with 1 output partitions  
17/09/07 06:01:36 INFO DAGScheduler: Final stage: ResultStage 0 (processCmd at CliDriver.java:376)  
17/09/07 06:01:36 INFO DAGScheduler: Parents of final …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1207
查看次数

Spark Thrift服务器无法清理随机文件

我们正在使用Spark Thrift Server对Spark EMR集群运行SQL查询,并且我们看到,当SQL查询(转换为Spark作业)完成时,位于其下方的shuffle文件/mnt/yarn/usercache/root/appcache不会被清除。这No space left on device最终导致在运行几个查询之后。

如果我们停止Spark Thrift Server,则将清理随机文件。有没有什么方法可以使清理不仅在应用程序停止之后运行,而且在每个作业运行之后运行?我们尝试设置以下参数

yarn.nodemanager.localizer.cache.cleanup.interval-ms=6000
yarn.nodemanager.localizer.cache.target-size-mb=1000
Run Code Online (Sandbox Code Playgroud)

但是文件仍未清除。知道为什么会发生以及如何避免它吗?

shuffle amazon-emr hadoop-yarn apache-spark spark-thriftserver

5
推荐指数
0
解决办法
137
查看次数

Google Natural Language API Java LanguageServiceClient如何使用api密钥

我正在尝试使用提供的Java客户端的google语言API对文本进行分类.代码在Scala中,它看起来像这样:

val language = LanguageServiceClient.create()
val doc = Document.newBuilder.setGcsContentUri(draftBody).setType(Type.PLAIN_TEXT).build
val request = ClassifyTextRequest.newBuilder.setDocument(doc).build
val response = language.classifyText(request)
Run Code Online (Sandbox Code Playgroud)

客户端无法授权,是否有办法使用我创建的API密钥进行授权?

java scala google-language-api google-cloud-platform

5
推荐指数
0
解决办法
94
查看次数

Spark:从管道模型中提取 ML 逻辑回归模型的摘要

我已经使用管道估计了逻辑回归。

我在拟合逻辑回归之前的最后几行:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="lr_features", labelCol = "targetvar")
# create assember to include encoded features
    lr_assembler = VectorAssembler(inputCols= numericColumns + 
                               [categoricalCol + "ClassVec" for categoricalCol in categoricalColumns],
                               outputCol = "lr_features")
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
# Model definition:
lr = LogisticRegression(featuresCol = "lr_features", labelCol = "targetvar")
# Pipeline definition:
lr_pipeline = Pipeline(stages = indexStages + encodeStages +[lr_assembler, lr])
# Fit the logistic regression model:
lrModel = lr_pipeline.fit(train_train)
Run Code Online (Sandbox Code Playgroud)

然后我尝试运行模型的摘要。但是,下面的代码行:

trainingSummary …
Run Code Online (Sandbox Code Playgroud)

python pipeline logistic-regression apache-spark pyspark

5
推荐指数
1
解决办法
4557
查看次数

在PySpark中登录UDF

我正在UDF中调用API,并尝试将输出记录到Logger中,并出现序列化错误。

以下是我的Logger初始化代码:

log4jLogger = spark._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
Run Code Online (Sandbox Code Playgroud)

在我的UDF中,我正在使用

LOGGER.info("Message")
Run Code Online (Sandbox Code Playgroud)

但是我得到了错误

pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o31.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
Run Code Online (Sandbox Code Playgroud)

注册UDF时

distance_udf = udf(distfunc, DoubleType())
Run Code Online (Sandbox Code Playgroud)

您能否更正我在日志记录中应该修改的内容,以及是否要登录到单独的日志文件中,该如何纠正我。

谢谢

python logging apache-spark pyspark

5
推荐指数
0
解决办法
696
查看次数

将 numpy 数组的 rdd 转换为 pyspark 数据帧

尝试将由 numpy 数组组成的 rdd 转换为 pyspark 中的数据帧时,出现以下错误:

下面是导致这个错误的一段代码,我什至不确定我能得到错误的实际位置,甚至阅读跟踪......

有谁知道如何绕过?

非常感谢 !

In [111]: rddUser.take(5)

Out[111]:

[array([u'1008798262000292538', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'102254941859441333', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'1035609083097069747', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'10363297284472000', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'1059178934871294116', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32')]
Run Code Online (Sandbox Code Playgroud)

那么麻烦来了:

In [110]: rddUser.toDF(schema=None).show()  

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-110-073037afd70e> in <module>()
----> 1 rddUser.toDF(schema=None).show()

     62         [Row(name=u'Alice', age=1)]
     63         """
---> 64         return …
Run Code Online (Sandbox Code Playgroud)

python numpy apache-spark rdd pyspark

5
推荐指数
1
解决办法
3508
查看次数

为什么用模式匹配收集不能缩小特定类?

让我们考虑以下特征:

sealed trait AB
case class A(a: Int) extends AB
case class B(b: Int) extends AB
Run Code Online (Sandbox Code Playgroud)

我试图将collect集合限制为特定的子类.

如果我尝试collect,匹配单个组件并重新组合元组:

scala> Seq.empty[(Int, AB)].collect { case (id, a @ A(_))  => (id, a) } : Seq[(Int, A)]
res6: Seq[(Int, ab.A)] = List()
Run Code Online (Sandbox Code Playgroud)

编译器很高兴,但如果尝试返回完整匹配:

scala> Seq.empty[(Int, AB)].collect { case x @ (_, A(_))  => x } : Seq[(Int, A)]
Run Code Online (Sandbox Code Playgroud)

事情变得丑陋:

<console>:27: error: type mismatch;
 found   : Seq[(Int, ab.AB)]
 required: Seq[(Int, ab.A)]
       Seq.empty[(Int, AB)].collect { case x @ (_, A(_))  => …
Run Code Online (Sandbox Code Playgroud)

scala type-inference pattern-matching scala-collections scala-compiler

5
推荐指数
1
解决办法
181
查看次数

Pyspark - 无法在 hadoop 二进制路径中找到 winutils 二进制文件

我正在尝试将 pyspark 与 python 2.7(Pycharm IDE)集成。我需要运行一些巨大的文本文件。

所以这就是我正在做的。

下载 Spark (2.3.0-bin-hadoop-2.7) 并解压 安装 JDK

然后我试图运行这个脚本

spark_home = os.environ.get('SPARK_HOME', None) os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7" import pyspark from pyspark import SparkContext, SparkConf from pyspark。 sql 导入 SparkSession

conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate() 
import pandas as pd
ip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"D:\some file.csv")
Run Code Online (Sandbox Code Playgroud)

Pycharm 说没有找到名为 Pyspark 的模块。

我正在通过添加内容根并指向安装它的文件夹来解决这个问题。

但问题是每次我重新打开 pycharm 时,我都必须添加内容根。我该如何解决?

接下来是,当我设法运行脚本时,它会引发以下错误。

2018-06-01 12:20:49 ERROR Shell:397 - Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

5
推荐指数
2
解决办法
8346
查看次数

Kubernetes上的Spark UI History服务器?

通过提交火花,我在Kubernetes集群上启动了应用程序。而且只有在访问http:// driver-pod:port时,我才能看到Spark-UI 。

如何在集群上启动Spark-UI History Server?如何使所有正在运行的Spark作业都在Spark-UI历史记录服务器上注册。

这可能吗?

apache-spark kubernetes

5
推荐指数
1
解决办法
376
查看次数

Apache Spark 文本相似度

我正在尝试以下 Java 示例

Apache Spark 中的高效字符串匹配

这是我的代码

public class App {
    public static void main(String[] args) {
        System.out.println("Hello World!");

        System.setProperty("hadoop.home.dir", "D:\\del");

        List<MyRecord> firstRow = new ArrayList<MyRecord>();
        firstRow.add(new App().new MyRecord("1", "Love is blind"));

        List<MyRecord> secondRow = new ArrayList<MyRecord>();
        secondRow.add(new App().new MyRecord("1", "Luv is blind"));

        SparkSession spark = SparkSession.builder().appName("LSHExample").config("spark.master", "local")
                .getOrCreate();

        Dataset firstDataFrame = spark.createDataFrame(firstRow, MyRecord.class);
        Dataset secondDataFrame = spark.createDataFrame(secondRow, MyRecord.class);

        firstDataFrame.show(20, false);
        secondDataFrame.show(20, false);

        RegexTokenizer regexTokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")
                .setPattern("\\W");
        NGram ngramTransformer = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams");
        HashingTF hashingTF = new HashingTF().setInputCol("ngrams").setOutputCol("vectors");
        MinHashLSH …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-ml apache-spark-mllib

4
推荐指数
1
解决办法
1922
查看次数