下面我有一个Spark fold
动作的Scala示例:
val rdd1 = sc.parallelize(List(1,2,3,4,5), 3)
rdd1.fold(5)(_ + _)
Run Code Online (Sandbox Code Playgroud)
这会产生输出35
.有人可以详细解释这个输出是如何计算的吗?
我在java-中有以下代码
@Override
public void run() {
logger.info("Thread Started:" + this.getName());
try {
runJob();
} catch( Throwable e) {
logger.error("Exception in running thread: " + this.getName() + ", restarting job", e);
run();
}
}
public void runJob() {
while(true) {
try {
// Work here
} catch(Exception e) {
// log the exception
}
}
}
Run Code Online (Sandbox Code Playgroud)
这个代码实际上是否会在每种情况下保持线程活动,这只是恢复线程的方法吗?
这是我在阅读完所有答案后想到的另一种选择.让我知道,即使发生错误,这是一个永远保持线程活着的好方法:
@Override
public void run() {
logger.info("Thread Started:" + this.getName());
while(true) {
try {
runJob();
} catch( Throwable e) {
logger.error("Exception in running thread: …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Spark流 api 从 kafka 读取 json 数据,当我这样做时,它会抛出java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.init异常。堆栈跟踪是 -
java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:50)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec$$anonfun$doExecute$1.apply(statefulOperators.scala:217)
at org.apache.spark.sql.execution.streaming.StateStoreRestoreExec$$anonfun$doExecute$1.apply(statefulOperators.scala:215)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:67)
at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:62)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:78)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:77)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at …
Run Code Online (Sandbox Code Playgroud) java apache-kafka apache-spark-dataset spark-structured-streaming
我试图在 java 中以语法方式连接 hazelcast man center,但我收到“无法发送响应”
INFO: [192.168.203.1]:5701 [dev] [3.7] Hazelcast will connect to Hazelcast
Management Center on address:
http://localhost:8080/mancenter
Jan 03, 2018 5:27:56 PM
com.hazelcast.internal.partition.impl.PartitionStateManager
INFO: [192.168.203.1]:5701 [dev] [3.7] Initializing cluster partition table
arrangement...
Jan 03, 2018 5:27:59 PM
com.hazelcast.internal.management.ManagementCenterService
**WARNING: [192.168.203.1]:5701 [dev] [3.7] Failed to send response,
responseCode:500 url:http://localhost:8080/mancenter/collector.do**
Run Code Online (Sandbox Code Playgroud)
这就是我连接到java程序的方式
public class HazelcastMember {
public static void main(String[] args) throws Exception {
//System.setProperty("hazelcast.local.localAddress", "127.0.0.1");
//ConsoleApp.main(args);
Config cfg = new Config();
cfg.getManagementCenterConfig().setEnabled(true).setUrl("http://localhost:8080/mancenter");
HazelcastInstance instance = Hazelcast.newHazelcastInstance(cfg);
}
Run Code Online (Sandbox Code Playgroud)
}
我正在使用 …
我试图通过禁用 BroadcastHashJoin 和 SortMergeJoin 来强制 Spark 使用 ShuffleHashJoin,但 Spark 始终使用 SortMergeJoin。
我使用的是spark版本2.4.3
object ShuffleHashJoin {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder()
.appName("ShuffleHashJoin")
.master("local[*]")
.getOrCreate()
/*
* Disable auto broadcasting of table and SortMergeJoin
*/
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)
import spark.implicits._
val dataset = Seq(
(0, "playing"),
(1, "with"),
(2, "ShuffledHashJoinExec")).toDF("id", "token")
dataset.join(dataset, Seq("id"), "inner").foreach(_ => ())
// infinite loop to keep the program running to check Spark UI at 4040 port.
while (true) {}
Run Code Online (Sandbox Code Playgroud) java ×3
apache-spark ×2
apache-kafka ×1
fold ×1
hazelcast ×1
scala ×1
throwable ×1
try-catch ×1