我有一个Scala Spark Streaming应用程序,它从 3 个不同的Kafka producers.
Spark 流应用程序位于主机 的机器上0.0.0.179,Kafka 服务器位于主机 的机器上0.0.0.178,它们Kafka producers位于机器 , 0.0.0.180,0.0.0.181上0.0.0.182。
当我尝试运行Spark Streaming应用程序时出现以下错误
线程“main”org.apache.spark.SparkException 中出现异常:作业由于阶段失败而中止:阶段 19.0 中的任务 0 失败 1 次,最近一次失败:阶段 19.0 中丢失任务 0.0(TID 19,本地主机):java.util .ConcurrentModificationException:KafkaConsumer 对于 org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) 的多线程访问不安全,在 org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer. java:1198)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)在org.apache .spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) 在 org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) 在 scala.collection.Iterator$$ anon$11.next(Iterator.scala:409) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13 $$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1204) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions. scala:1203) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) 在 org.apache.spark.util.Utils$ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) 在 org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) 在 org.apache.spark.rdd.PairRDDFunctions$$ anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 在 org.apache.spark.scheduler.Task.run(Task .scala:85) …
multithreading scala apache-kafka apache-spark spark-streaming
这是我的数据框的输出
val finaldf.show(false)
+------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|DataPartition |TimeStamp |Source_organizationId|Source_sourceId|FilingDateTime |SourceTypeCode|DocumentId|Dcn |DocFormat|StatementDate |IsFilingDateTimeEstimated|ContainsPreliminaryData|CapitalChangeAdjustmentDate|CumulativeAdjustmentFactor|ContainsRestatement|FilingDateTimeUTCOffset|ThirdPartySourceCode|ThirdPartySourcePriority|SourceTypeId|ThirdPartySourceCodeId|FFAction|!||
+------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|SelfSourcedPrivate|2017-11-02T10:23:59+00:00|4298009288 |80 |2017-09-28T23:00:00+00:00|10K |null |171105584 |ASFILED |2017-07-31T00:00:00+00:00|false |false |2017-07-31T00:00:00+00:00 |1.0 |false |-300 |SS |1 |3011835 |1000716240 |I|!| |
|SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170 |364 |2017-08-08T17:00:00+00:00|10Q |null |null |null |2017-07-30T00:00:00+00:00|false |false |2017-07-30T00:00:00+00:00 |1.0 |false |-300 |SS |1 |3011836 |1000716240 |I|!| |
|SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170 |365 |2017-10-10T17:00:00+00:00|10K |null |null |null |2017-09-30T00:00:00+00:00|false |false |2017-09-30T00:00:00+00:00 |1.0 |false |-300 |SS |1 |3011835 |1000716240 |I|!| |
|SelfSourcedPublic |2017-11-21T12:17:49+00:00|4295904170 |365 |2017-10-10T17:00:00+00:00|10K |null |null |null |2017-09-30T00:00:00+00:00|false |false |2017-09-30T00:00:00+00:00 |1.0 |false |-300 …Run Code Online (Sandbox Code Playgroud) 我想扩展Option[Int]它的类,以便它返回0if None和value,Some如果它不是None.我应该如何使下面的代码工作?
Some(10).default // returns 10
Option.empty[Int].default // returns 0
Run Code Online (Sandbox Code Playgroud) 这是我现有的数据框
+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|DataPartition |TimeStamp |_lineItemId|_organizationId|fl:FinancialConceptGlobal|fl:FinancialConceptGlobalId|fl:FinancialConceptLocal|fl:FinancialConceptLocalId|fl:InstrumentId|fl:IsCredit|fl:IsDimensional|fl:IsRangeAllowed|fl:IsSegmentedByOrigin|fl:SegmentGroupDescription|fl:Segments|fl:StatementTypeCode|FFAction|!||LineItemName |LineItemName.languageId|LocalLanguageLabel|LocalLanguageLabel.languageId|SegmentChildDescription|SegmentChildDescription.languageId|
+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|SelfSourcedPrivate|2017-11-02T10:23:59+00:00|3 |4298009288 |XTOT |3016350 |null |null |null |true |false |false |false |null |null |BAL |I|!| |Total Assets |505074 |null |null |null |null |
Run Code Online (Sandbox Code Playgroud)
这是上述数据框的模式
root
|-- DataPartition: string (nullable = true)
|-- TimeStamp: string (nullable = true)
|-- _lineItemId: long (nullable = true)
|-- _organizationId: long (nullable = true)
|-- fl:FinancialConceptGlobal: string (nullable = true)
|-- fl:FinancialConceptGlobalId: long (nullable = true)
|-- fl:FinancialConceptLocal: string (nullable = true)
|-- fl:FinancialConceptLocalId: long (nullable = true) …Run Code Online (Sandbox Code Playgroud) 我是Scala的新手,我有一个字符串列表-
List[String] (“alpha”, “gamma”, “omega”, “zeta”, “beta”)
我想计算长度= 4的所有字符串
即我想得到输出= 2。
如何在Spark-Scala中合并3个DataFrame?我完全没有任何想法我怎么能做到这一点.在stackOverFlow上我找不到类似的例子.
我有3个类似的DataFrame.Column的名称和它们的编号相同.差异只是行的值.
+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
| 1 |wdasd |xyzd|111|
| 1 |wd |zdfd|112|
| 1 |bdp |2gfs|113|
+----+------+----+---+
Run Code Online (Sandbox Code Playgroud)
+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
| 2 |wdasd |xyzd|221|
| 2 |wd |zdfd|222|
| 2 |bdp |2gfs|223|
+----+------+----+---+
Run Code Online (Sandbox Code Playgroud)
+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
| 3 |AAAA |N_AM|331|
| 3 |BBBB |NA_M|332|
| 3 |CCCC |MA_N|333|
+----+------+----+---+
Run Code Online (Sandbox Code Playgroud)
我想要这种类型的DataFrame
+----+------+----+---+
|type| Model|Name|ID |
+----+------+----+---+
| 1 |wdasd |xyzd|111|
| 1 |wd |zdfd|112|
| 1 |bdp |2gfs|113|
| 2 …Run Code Online (Sandbox Code Playgroud) 例如,旋转array A = [3, 8, 9, 7, 6] is [6, 3, 8, 9, 7]。目标是将数组旋转 AK 次;也就是说,A 的每个元素都将向右移动 K 个索引。
例如,给定数组A = [3, 8, 9, 7, 6]和K = 3,函数应该返回[9, 7, 6, 3, 8]。
我想要这个在java中。我试过这个。
public static int[] rotation(int[] a,int k) {
int[] newArray = new int[a.length];
for(int i = 0 ; i < a.length ; i++) {
int newPosition = (i + k)%a.length;
newArray[newPosition] = a[i];
}
return newArray;
}
Run Code Online (Sandbox Code Playgroud) javac HelloWorld.java
Run Code Online (Sandbox Code Playgroud)
每当我在cmd中运行时,以下错误消息都会不断出现
javac:找不到文件:HelloWorld.java
但是,当我添加我的文件位置
javac D:\User\Documents\Project\HelloWorld.java
Run Code Online (Sandbox Code Playgroud)
它运行成功.为什么?
我尝试过编辑的变量值PATH来D:\User\Documents\Project
但它没有用.