小编the*_*evd的帖子

折叠动作如何在Spark中起作用?

下面我有一个Spark fold动作的Scala示例:

val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
rdd1.fold(5)(_ + _)
Run Code Online (Sandbox Code Playgroud)

这会产生输出35.有人可以详细解释这个输出是如何计算的吗?

scala fold apache-spark

7
推荐指数
1
解决办法
2168
查看次数

抓住Java Throwable重启线程的最佳方法

我在java-中有以下代码

@Override
public void run() {

    logger.info("Thread Started:" + this.getName());

    try {
        runJob();
    } catch( Throwable e) {
        logger.error("Exception in running thread: " + this.getName() + ", restarting job", e);
        run();
    }
}

public void runJob() {

    while(true) {
       try {
           // Work here
       } catch(Exception e) {
           // log the exception
       }
    }
 }
Run Code Online (Sandbox Code Playgroud)

这个代码实际上是否会在每种情况下保持线程活动,这只是恢复线程的方法吗?

这是我在阅读完所有答案后想到的另一种选择.让我知道,即使发生错误,这是一个永远保持线程活着的好方法:

@Override
public void run() {

    logger.info("Thread Started:" + this.getName());

    while(true) {
      try {
        runJob();
      } catch( Throwable e) {
        logger.error("Exception in running thread: …
Run Code Online (Sandbox Code Playgroud)

java multithreading try-catch throwable

3
推荐指数
1
解决办法
272
查看次数

使用火花流从 kafka 读取数据时 lz4 异常

我正在尝试使用Spark流 api 从 kafka 读取 json 数据,当我这样做时,它会抛出java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.init异常。堆栈跟踪是 -

java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec$$anonfun$doExecute$1.apply(statefulOperators.scala:217)
at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec$$anonfun$doExecute$1.apply(statefulOperators.scala:215)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:67)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:62)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:78)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:77)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-spark-dataset spark-structured-streaming

3
推荐指数
1
解决办法
2713
查看次数

连接到 Hazelcast mancenter 失败

我试图在 java 中以语法方式连接 hazelcast man center,但我收到“无法发送响应”

INFO: [192.168.203.1]:5701 [dev] [3.7] Hazelcast will connect to Hazelcast 
Management Center on address: 
http://localhost:8080/mancenter
Jan 03, 2018 5:27:56 PM 
com.hazelcast.internal.partition.impl.PartitionStateManager
INFO: [192.168.203.1]:5701 [dev] [3.7] Initializing cluster partition table 
arrangement...
Jan 03, 2018 5:27:59 PM 
com.hazelcast.internal.management.ManagementCenterService
**WARNING: [192.168.203.1]:5701 [dev] [3.7] Failed to send response, 
responseCode:500 url:http://localhost:8080/mancenter/collector.do**
Run Code Online (Sandbox Code Playgroud)

这就是我连接到java程序的方式

public class HazelcastMember {

public static void main(String[] args) throws Exception {
    //System.setProperty("hazelcast.local.localAddress", "127.0.0.1");
    //ConsoleApp.main(args);

    Config cfg = new Config();
    cfg.getManagementCenterConfig().setEnabled(true).setUrl("http://localhost:8080/mancenter");
    HazelcastInstance instance = Hazelcast.newHazelcastInstance(cfg);
}
Run Code Online (Sandbox Code Playgroud)

}

我正在使用 …

java hazelcast

0
推荐指数
1
解决办法
2150
查看次数

Spark sortMergeJoin 不会更改为 shuffleHashJoin

我试图通过禁用 BroadcastHashJoin 和 SortMergeJoin 来强制 Spark 使用 ShuffleHashJoin,但 Spark 始终使用 SortMergeJoin。

我使用的是spark版本2.4.3

object ShuffleHashJoin {

def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)

val spark = SparkSession.builder()
  .appName("ShuffleHashJoin")
  .master("local[*]")
  .getOrCreate()

/*
* Disable auto broadcasting of table and SortMergeJoin
*/
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)

import spark.implicits._
val dataset = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "ShuffledHashJoinExec")).toDF("id", "token")

dataset.join(dataset, Seq("id"), "inner").foreach(_ => ())

// infinite loop to keep the program running to check Spark UI at 4040 port.
while (true) {}
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1237
查看次数