小编Chr*_*lis的帖子

为什么加入失败的"java.util.concurrent.TimeoutException:期货在[300秒]之后超时"?

我正在使用Spark 1.5.

我有两个表格的数据框:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]
Run Code Online (Sandbox Code Playgroud)

libriFirstTable50Plus3DF766,151条记录,linkPersonItemLessThan500DF26,694,353条记录.请注意我正在使用repartition(number),linkPersonItemLessThan500DF因为我打算稍后加入这两个.我正在跟进以上代码:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))
Run Code Online (Sandbox Code Playgroud)

我得到这个输出:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at …
Run Code Online (Sandbox Code Playgroud)

scala join apache-spark apache-spark-sql

41
推荐指数
3
解决办法
4万
查看次数

如何计算合并的最佳numberOfPartitions?

所以,据我所知,一般情况下应该使用coalesce():

由于某个filter或其他操作可能导致减少原始数据集(RDD,DF),分区数量减少.coalesce()过滤大型数据集后,可以更有效地运行操作.

我也明白它比repartition通过仅在必要时移动数据来减少混乱更便宜.我的问题是如何定义coalesce带(idealPartionionNo)的参数.我正在研究一个项目,该项目是从另一位工程师传递给我的,他使用下面的计算来计算该参数的值.

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
Run Code Online (Sandbox Code Playgroud)

然后将其与partitioner对象一起使用:

val partitioner = new HashPartitioner(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)

但也用于:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?idealPartionionNo价值计算背后的主要思想是什么?什么是REPARTITION_FACTOR?我一般如何定义它?

此外,由于纱线负责确定对飞可用执行人有获得该号(的方式AVAILABLE_EXECUTOR_INSTANCES在运行),并利用它来进行计算idealPartionionNo(如更换NO_OF_EXECUTOR_INSTANCESAVAILABLE_EXECUTOR_INSTANCES)?

理想情况下,表单的一些实际示例:

  • 这是一个数据集(大小);
  • 这是RDD/DF的一些转换和可能的重用.
  • 这是你应该重新分配/合并的地方.
  • 假设您有n 执行程序,其m 核心分区因子 …

scala apache-spark rdd

16
推荐指数
3
解决办法
5165
查看次数

Spark:转换为DF后,collect(),take()和show()输出之间的差异

我正在使用Spark 1.5.

我有一个30列的列,我integers从数据库加载:

val numsRDD = sqlContext
     .table(constants.SOURCE_DB + "." + IDS)
     .select("id")
     .distinct
     .map(row=>row.getInt(0))
Run Code Online (Sandbox Code Playgroud)

这是输出numsRDD:

numsRDD.collect.foreach(println(_))

643761
30673603
30736590
30773400
30832624
31104189
31598495
31723487
32776244
32801792
32879386
32981901
33469224
34213505
34709608
37136455
37260344
37471301
37573190
37578690
37582274
37600896
37608984
37616677
37618105
37644500
37647770
37648497
37720353
37741608
Run Code Online (Sandbox Code Playgroud)

接下来,我想为那些产生3的所有组合,ids然后将每个组合保存为表单的元组:< tripletID: String, triplet: Array(Int)>并将其转换为数据帧,我按如下方式执行:

// |combinationsDF| = 4060 combinations
val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => …
Run Code Online (Sandbox Code Playgroud)

scala collect dataframe take apache-spark

11
推荐指数
2
解决办法
2万
查看次数

PySpark、GraphFrames、异常引起:java.lang.ClassNotFoundException:com.typesafe.scalalogging.slf4j.LazyLogging

我正在尝试运行以下利用图形框架的代码,但我现在遇到了一个错误,据我所知,经过几个小时的谷歌搜索后,我无法解决。似乎无法加载一个类,但我真的不知道我还应该做什么。

有人可以再看看下面的代码和错误吗?我已按照此处的说明进行操作,如果您想快速尝试一下,可以在此处找到我的数据集。

"""
Program:    RUNNING GRAPH ANALYTICS WITH SPARK GRAPH-FRAMES:
Author:     Dr. C. Hadjinikolis
Date:       14/09/2016
Description:    This is the application's core module from where everything is executed.
                The module is responsible for:
                1. Loading Spark
                2. Loading GraphFrames
                3. Running analytics by leveraging other modules in the package.
"""
# IMPORT OTHER LIBS -------------------------------------------------------------------------------#
import os
import sys
import pandas as pd

# IMPORT SPARK ------------------------------------------------------------------------------------#
# Path to Spark source folder
USER_FILE_PATH = "/Users/christoshadjinikolis" …
Run Code Online (Sandbox Code Playgroud)

pyspark graphframes

5
推荐指数
1
解决办法
5231
查看次数

Tensorflow + Keras + Convolution2d:ValueError:Filter不得大于输入:Filter:(5,5)输入:(3,350)

我一直试图运行我从这里得到的代码,即使我已经改变了几乎没有什么比图像大小(350,350而不是150,150)仍然无法让它工作.我得到上面的过滤器错误(在标题中),我理解但我没有做错,所以我不明白这一点.它基本上说我不能拥有比输入更多的节点,对吗?

通过更改此行,我最终能够破解我的解决方案:

model.add(Convolution2D(32, 5, 5, border_mode='valid', input_shape=(3, IMG_WIDTH, IMG_HEIGHT)))
Run Code Online (Sandbox Code Playgroud)

有了这个:

model.add(Convolution2D(32, 5, 5, border_mode='valid', input_shape=(IMG_WIDTH, IMG_HEIGHT, 3)))
Run Code Online (Sandbox Code Playgroud)

但我仍然想知道为什么会这样.

这是下面的代码以及我得到的错误.会感激一些帮助(我使用的是Python Anaconda 2.7.11).

# IMPORT LIBRARIES --------------------------------------------------------------------------------#
import glob
import tensorflow
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Convolution2D, MaxPooling2D
from keras.layers import Activation, Dropout, Flatten, Dense
from settings import RAW_DATA_ROOT

# GLOBAL VARIABLES --------------------------------------------------------------------------------#
TRAIN_PATH = RAW_DATA_ROOT + "/train/"
TEST_PATH = RAW_DATA_ROOT + "/test/"

IMG_WIDTH, IMG_HEIGHT = 350, 350

NB_TRAIN_SAMPLES = len(glob.glob(TRAIN_PATH + "*"))
NB_VALIDATION_SAMPLES …
Run Code Online (Sandbox Code Playgroud)

convolution python-2.7 keras tensorflow

4
推荐指数
2
解决办法
2859
查看次数

如何使用 DataFrame.explode 和自定义 UDF 将字符串拆分为子字符串?

我使用 Spark 1.5

我有一个数据框A_DF如下:

+--------------------+--------------------+
|                  id|        interactions|
+--------------------+--------------------+
|        id1         |30439831,30447866...|
|        id2         |37597858,34499875...|
|        id3         |30447866,32896718...|
|        id4         |33029476,31988037...|
|        id5         |37663606,37627579...|
|        id6         |37663606,37627579...|
|        id7         |36922232,37675077...|
|        id8         |37359529,37668820...|
|        id9         |37675077,37707778...|
+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

哪里interactions是 a String. 我想通过首先将字符串拆分为一组用逗号分隔的子字符串来分解interactions它,我尝试执行以下操作:

val splitArr = udf { (s: String) => s.split(",").map(_.trim) }

val B_DF = A_DF.explode(splitArr($"interactions"))
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

val splitArr = udf { (s: String) => s.split(",").map(_.trim) }

val B_DF = A_DF.explode(splitArr($"interactions"))
Run Code Online (Sandbox Code Playgroud)

我不明白。所以我尝试了更复杂的事情: …

scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
5958
查看次数

Flink IOException:网络缓冲区数量不足

我正在使用Flink v1.4.0. 我正在使用DataSet API(虽然这个,我认为无关紧要)。

我正在 12 核 VM 上运行一些重型转换。我正在使用 2 个内核Flink job,其中我将一些数据存储到一个内核中,并使用剩余的 10 个内核Flink Queryable State运行另一个Flink作业。

当我用 10 个内核运行第二个作业时,我似乎收到以下错误:

java.io.IOException: Insufficient number of network buffers: required 10, but only 9 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
            at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
            at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:199)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:618)
            at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

如果我确实用 8 …

java apache-flink

3
推荐指数
1
解决办法
4295
查看次数

类'SessionTrigger'必须声明为abstract或实现抽象成员

我正在构建一个关于Flink 1.2的教程,我想运行一些简单的窗口示例.其中之一是Session Windows.

我想要运行的代码如下:

import <package>.Session
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

import scala.util.Try

object SessionWindowExample {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source = env.socketTextStream("localhost", 9000)

    //session map
    val values = source.map(value => {
      val columns = value.split(",")
      val endSignal = Try(Some(columns(2))).getOrElse(None)
      Session(columns(0), columns(1).toDouble, endSignal)
    })

    val keyValue = values.keyBy(_.sessionId)

    // create global window

    val sessionWindowStream = keyValue.
      window(GlobalWindows.create()).
      trigger(PurgingTrigger.of(new SessionTrigger[GlobalWindow]()))

    sessionWindowStream.sum("value").print()

    env.execute()
  }
}
Run Code Online (Sandbox Code Playgroud)

正如您将注意到的,我需要new SessionTrigger基于此类实例化我所做的对象:

import <package>.Session
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import …
Run Code Online (Sandbox Code Playgroud)

abstract-class scala apache-flink

2
推荐指数
1
解决办法
882
查看次数

Flink JobExecutionException:akka.client.timeout

我正在使用Flink v.1.4.0.

我正在尝试使用DataSet APIthrough运行作业IntelliJ。请注意,如果我通过Flink UI该作业运行相同的作业,则该作业运行良好。为了运行作业,我需要首先通过环境变量指定将要处理的数据量。当数量相对较小时,作业运行良好。但随着它变大,我开始收到以下错误:

ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
31107 [main] ERROR com.company.someLib.SomeClass - Error executing pipeline
org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:565)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:539)
at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:193)
at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
at com.ubs.digital.comms.graph.emailanalyser.EmailAnalyserPipeline.lambda$runPipeline$1(EmailAnalyserPipeline.java:120)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at …
Run Code Online (Sandbox Code Playgroud)

java intellij-idea akka apache-flink

2
推荐指数
1
解决办法
1386
查看次数

在 PyCharm 中使用图形框架

我花了将近 2 天的时间在互联网上滚动,但我无法解决这个问题。我正在尝试安装graphframes 包(版本:0.2.0-spark2.0-s_2.11)以通过 PyCharm 运行 spark,但是,尽管我尽了最大努力,但这是不可能的。

我几乎尝试了所有方法。请知道,在发布答案之前,我也在这里检查了这个网站。

这是我试图运行的代码:

# IMPORT OTHER LIBS --------------------------------------------------------
import os
import sys
import pandas as pd

# IMPORT SPARK ------------------------------------------------------------------------------------#
# Path to Spark source folder
USER_FILE_PATH = "/Users/<username>"
SPARK_PATH = "/PycharmProjects/GenesAssociation"
SPARK_FILE = "/spark-2.0.0-bin-hadoop2.7"
SPARK_HOME = USER_FILE_PATH + SPARK_PATH + SPARK_FILE
os.environ['SPARK_HOME'] = SPARK_HOME

# Append pySpark to Python Path
sys.path.append(SPARK_HOME + "/python")
sys.path.append(SPARK_HOME + "/python" + "/lib/py4j-0.10.1-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.sql …
Run Code Online (Sandbox Code Playgroud)

python installation pycharm pyspark graphframes

1
推荐指数
1
解决办法
2576
查看次数

什么是冯·诺伊曼的瓶颈?

什么是Von Neuman瓶颈以及函数式编程如何降低其影响?有人可以通过一个实用而全面的例子简单的方式解释,例如,显示使用Scala而不是Java的优势,如果有的话?

更重要的是,为什么要避免强制性控制结构,而选择如此重要的功能来提高性能呢?理想情况下,实际编码例子,说明如何解决的问题一个功能,没有一个是由冯-诺依曼瓶颈的影响将是非常有益的.

functional-programming scala

1
推荐指数
1
解决办法
579
查看次数