小编vin*_*dev的帖子

java.util.ConcurrentModificationException:KafkaConsumer 对于多线程访问不安全

我有一个Scala Spark Streaming应用程序,它从 3 个不同的Kafka producers.

Spark 流应用程序位于主机 的机器上0.0.0.179,Kafka 服务器位于主机 的机器上0.0.0.178,它们Kafka producers位于机器 , 0.0.0.180,0.0.0.1810.0.0.182

当我尝试运行Spark Streaming应用程序时出现以下错误

线程“main”org.apache.spark.SparkException 中出现异常:作业由于阶段失败而中止:阶段 19.0 中的任务 0 失败 1 次,最近一次失败:阶段 19.0 中丢失任务 0.0(TID 19,本地主机):java.util .ConcurrentModificationException:KafkaConsumer 对于 org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) 的多线程访问不安全,在 org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer. java:1198)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)在org.apache .spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) 在 org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) 在 scala.collection.Iterator$$ anon$11.next(Iterator.scala:409) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13 $$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1204) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions. scala:1203) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) 在 org.apache.spark.util.Utils$ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) 在 org.apache.spark.rdd.PairRDDFunctions$$ anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 在 org.apache.spark.scheduler.Task.run(Task .scala:85) …

multithreading scala apache-kafka apache-spark spark-streaming

2
推荐指数
1
解决办法
8685
查看次数

concat_ws从spark数据帧的输出中删除空字符串

这是我的数据框的输出

val finaldf.show(false)

+------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|DataPartition     |TimeStamp                |Source_organizationId|Source_sourceId|FilingDateTime           |SourceTypeCode|DocumentId|Dcn       |DocFormat|StatementDate            |IsFilingDateTimeEstimated|ContainsPreliminaryData|CapitalChangeAdjustmentDate|CumulativeAdjustmentFactor|ContainsRestatement|FilingDateTimeUTCOffset|ThirdPartySourceCode|ThirdPartySourcePriority|SourceTypeId|ThirdPartySourceCodeId|FFAction|!||
+------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|SelfSourcedPrivate|2017-11-02T10:23:59+00:00|4298009288           |80             |2017-09-28T23:00:00+00:00|10K           |null      |171105584 |ASFILED  |2017-07-31T00:00:00+00:00|false                    |false                  |2017-07-31T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |
|SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170           |364            |2017-08-08T17:00:00+00:00|10Q           |null      |null      |null     |2017-07-30T00:00:00+00:00|false                    |false                  |2017-07-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011836     |1000716240            |I|!|       |
|SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170           |365            |2017-10-10T17:00:00+00:00|10K           |null      |null      |null     |2017-09-30T00:00:00+00:00|false                    |false                  |2017-09-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |
|SelfSourcedPublic |2017-11-21T12:17:49+00:00|4295904170           |365            |2017-10-10T17:00:00+00:00|10K           |null      |null      |null     |2017-09-30T00:00:00+00:00|false                    |false                  |2017-09-30T00:00:00+00:00  |1.0                       |false              |-300 …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

2
推荐指数
1
解决办法
3312
查看次数

Scala - 如何为Option [Int]设置默认值

我想扩展Option[Int]它的类,以便它返回0if None和value,Some如果它不是None.我应该如何使下面的代码工作?

Some(10).default // returns 10
Option.empty[Int].default // returns 0
Run Code Online (Sandbox Code Playgroud)

functional-programming scala

2
推荐指数
1
解决办法
172
查看次数

由于 . 在spark的列名中

这是我现有的数据框

+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|DataPartition     |TimeStamp                |_lineItemId|_organizationId|fl:FinancialConceptGlobal|fl:FinancialConceptGlobalId|fl:FinancialConceptLocal|fl:FinancialConceptLocalId|fl:InstrumentId|fl:IsCredit|fl:IsDimensional|fl:IsRangeAllowed|fl:IsSegmentedByOrigin|fl:SegmentGroupDescription|fl:Segments|fl:StatementTypeCode|FFAction|!||LineItemName                                                                                |LineItemName.languageId|LocalLanguageLabel|LocalLanguageLabel.languageId|SegmentChildDescription|SegmentChildDescription.languageId|
+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|SelfSourcedPrivate|2017-11-02T10:23:59+00:00|3          |4298009288     |XTOT                     |3016350                    |null                    |null                      |null           |true       |false           |false            |false                 |null                      |null       |BAL                 |I|!|       |Total Assets                                                                                |505074                 |null              |null                         |null                   |null                              |
Run Code Online (Sandbox Code Playgroud)

这是上述数据框的模式

root
 |-- DataPartition: string (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- _lineItemId: long (nullable = true)
 |-- _organizationId: long (nullable = true)
 |-- fl:FinancialConceptGlobal: string (nullable = true)
 |-- fl:FinancialConceptGlobalId: long (nullable = true)
 |-- fl:FinancialConceptLocal: string (nullable = true)
 |-- fl:FinancialConceptLocalId: long (nullable = true) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

0
推荐指数
1
解决办法
8685
查看次数

在Scala中查找字符串的长度

我是Scala的新手,我有一个字符串列表-
List[String] (“alpha”, “gamma”, “omega”, “zeta”, “beta”)

我想计算长度= 4的所有字符串

即我想得到输出= 2。

scala string-length

0
推荐指数
2
解决办法
4562
查看次数

如何在Scala中合并三个DataFrame

如何在Spark-Scala中合并3个DataFrame?我完全没有任何想法我怎么能做到这一点.在stackOverFlow上我找不到类似的例子.

我有3个类似的DataFrame.Column的名称和它们的编号相同.差异只是行的值.

DataFrame1:

+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
|  1 |wdasd |xyzd|111|
|  1 |wd    |zdfd|112|
|  1 |bdp   |2gfs|113|
+----+------+----+---+
Run Code Online (Sandbox Code Playgroud)

DataFrame2:

+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
|  2 |wdasd |xyzd|221|
|  2 |wd    |zdfd|222|
|  2 |bdp   |2gfs|223|
+----+------+----+---+
Run Code Online (Sandbox Code Playgroud)

DataFrame3:

+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
|  3 |AAAA  |N_AM|331|
|  3 |BBBB  |NA_M|332|
|  3 |CCCC  |MA_N|333|
+----+------+----+---+
Run Code Online (Sandbox Code Playgroud)

我想要这种类型的DataFrame

MergeDataFrame:

+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
|  1 |wdasd |xyzd|111|
|  1 |wd    |zdfd|112|
|  1 |bdp   |2gfs|113|
|  2 …
Run Code Online (Sandbox Code Playgroud)

merge scala dataframe apache-spark

-1
推荐指数
1
解决办法
4528
查看次数

数组的旋转是指每个元素右移一个索引,数组的最后一个元素也移到第一位

例如,旋转array A = [3, 8, 9, 7, 6] is [6, 3, 8, 9, 7]。目标是将数组旋转 AK 次;也就是说,A 的每个元素都将向右移动 K 个索引。

例如,给定数组A = [3, 8, 9, 7, 6]K = 3,函数应该返回[9, 7, 6, 3, 8]

我想要这个在java中。我试过这个。

public static int[] rotation(int[] a,int k) {

    int[] newArray = new int[a.length];
    for(int i = 0 ; i < a.length ; i++) {
        int newPosition = (i + k)%a.length;
        newArray[newPosition] = a[i];
    }
    return newArray;
}
Run Code Online (Sandbox Code Playgroud)

java arrays

-2
推荐指数
1
解决办法
5363
查看次数

我无法理解为什么"javac:找不到文件:HelloWorld.java"

javac HelloWorld.java
Run Code Online (Sandbox Code Playgroud)

每当我在cmd中运行时,以下错误消息都会不断出现

javac:找不到文件:HelloWorld.java

但是,当我添加我的文件位置

javac D:\User\Documents\Project\HelloWorld.java
Run Code Online (Sandbox Code Playgroud)

它运行成功.为什么?

我尝试过编辑的变量值PATHD:\User\Documents\Project

但它没有用.

java

-2
推荐指数
1
解决办法
1932
查看次数