小编Ama*_*ana的帖子

Kafkaconsumer对多线程访问不安全

我使用下面的代码来读取Kafka主题,并处理数据.

JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record))
                .transform(new Function<JavaRDD<Row>, JavaRDD<Row>>() {
                    //JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD();
                    StructType schema = DataTypes.createStructType(fields);

                    public JavaRDD<Row> call(JavaRDD<Row> rdd) throws Exception {
                        records = rdd.union(records);
                        return rdd;
                    }
        });

       transformedMessages.foreachRDD(record -> {
            //System.out.println("Aman" +record.count());
            StructType schema = DataTypes.createStructType(fields);

            Dataset ds = ss.createDataFrame(records, schema);
            ds.createOrReplaceTempView("trades");
            System.out.println(ds.count());
            ds.show();

        });
Run Code Online (Sandbox Code Playgroud)

运行代码时,我遇到异常:

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1197)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) …
Run Code Online (Sandbox Code Playgroud)

spark-streaming

11
推荐指数
2
解决办法
9538
查看次数

Spark SQL 查询中的 Union 删除数据集中的重复项

我正在使用 Apache Spark 的 Java API,并且我有两个数据集 A 和 B。这两个数据集的架构是相同的:电话号码、姓名、年龄、地址

两个数据集中都有一条记录具有相同的 PhoneNumber,但该记录中的其他列不同

我对这两个数据集运行以下 SQL 查询(通过将它们注册为临时表):

A.createOrReplaceTempView("A");
B.createOrReplaceTempView("B");

String query = "Select * from A UNION Select * from B";

Dataset<Row> result = sparkSession.sql(query);
result.show();
Run Code Online (Sandbox Code Playgroud)

令人惊讶的是,结果只有一条具有相同 PhoneNumber 的记录,另一条记录被删除了。

我知道 UNION 是 SQL 查询,旨在删除重复项,但它还需要知道主键,并据此决定重复项。

此查询如何推断我的数据集的“主键”?(Spark中没有主键的概念)

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
6699
查看次数

如果在Spark SQL中不存在其他更新,则进行插入

在Spark SQL中是否有做“如果不存在则进行其他更新的插入”的规定。

我有一些记录的Spark SQL表“ ABC”。然后我有另一批记录,我想根据它们是否存在于此表中在此表中插入/更新。

我可以在SQL查询中使用SQL命令来实现这一点吗?

apache-spark apache-spark-sql

3
推荐指数
1
解决办法
1733
查看次数