小编Kar*_*ikJ的帖子

Spark Structured Streaming with Kafka - 如何重新分区数据并在工作节点之间分配处理

如果我的 Kafka 主题收到类似的记录

CHANNEL | VIEWERS | .....
ABC     |  100    | .....
CBS     |  200    | .....
Run Code Online (Sandbox Code Playgroud)

我有 Spark 结构化流代码来读取和处理 Kafka 记录,如下所示:

val spark = SparkSession 
      .builder 
      .appName("TestPartition") 
      .master("local[*]") 
      .getOrCreate() 

    import spark.implicits._ 

    val dataFrame = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", 
      "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092") 
      .option("subscribe", "partition_test") 
      .option("failOnDataLoss", "false") 
      .load() 
      .selectExpr("CAST(value AS STRING)") 
      // I will use a custom UDF to transform to a specific object
Run Code Online (Sandbox Code Playgroud)

目前,我使用 foreachwriter 处理记录如下:

val writer = new ForeachWriter[testRec] {
    def open(partitionId: Long, version: Long): Boolean = {
      true …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

5
推荐指数
1
解决办法
904
查看次数

GROUP_CONCAT Hibernate HQL

基本上我正在尝试在 Hibernate 中编写以下查询。请帮助我做同样的事情。

SELECT collaboratoruser,GROUP_CONCAT(collaboratorrole SEPARATOR ',') FROM tbl_conceptcollections_collaborator WHERE collectionid = incollectionid GROUP BY collaboratoruser;

我无法使用 SQL 查询,我只想使用 HQL。任何帮助表示赞赏。

hibernate group-by hql

3
推荐指数
1
解决办法
7549
查看次数

运行tomcat的Docker容器 - 无法使用主机IP地址访问服务器

我正在尝试从docker文件构建一个运行tomcat的docker容器.请在下面找到Dockerfile内容:

FROM ubuntu:trusty
MAINTAINER karthik.jayaraman
VOLUME ["/tomcat/files"]
ADD /files/tar/apache-tomcat-7.0.47.tar.gz /usr/local/tomcat
ADD /files/scripts/. /tmp/tomcat_temp
RUN ls /tmp/tomcat_temp
RUN cp  -a /tmp/tomcat_temp/. /etc/init.d
RUN chmod 755 /etc/init.d/tomcat
RUN chkconfig --add tomcat && chkconfig --level 234 tomcat on
ADD /files/config   /usr/local/tomcat/apache-tomcat-7.0.47/conf/
ADD /files/lib  /usr/local/tomcat/apache-tomcat-7.0.47/lib/
ENV CATALINA_HOME /usr/local/tomcat/apache-tomcat-7.0.47
ENV PATH $PATH:$CATALINA_HOME/bin
EXPOSE 8080
CMD ["service","tomcat","start"]
Run Code Online (Sandbox Code Playgroud)

当我创建映像并在容器中运行bash时,使用命令"Service tomcat start"启动服务器.我检查了catalina.out文件并确保其运行.但是当我尝试安装docker的主机IP并使用端口号8080访问端口时,我可以连接到tomcat页面.但是当我指定容器的内部IP地址 - 172.24.0.7:8080时,我可以查看tomcat页面.我猜端口转发不正确.有人能告诉我我在这里犯的错误.

tomcat docker

3
推荐指数
1
解决办法
2万
查看次数

ReactJS ES6函数绑定 - 未捕获TypeError:无法读取未定义的属性'update'

我正在使用父子模式下的多个组件的ReactJS.在以下示例中,更新函数从Board组件(父组件)传递到Note组件(子组件).当我执行代码时,我得到Uncaught TypeError: Cannot read property 'update' of undefined错误,我在构造函数中有一个绑定(this).

import React, { Component } from 'react';
import Note from './note';

export default class Board extends Component {

  constructor(props) {
    super(props);
    this.state = {
       notes: [
        'Call Bill',
        'Email list',
        'Fix evernote tags'
       ]
    };
    this.update = this.update.bind(this);
  }

 update(newText,i){
    alert('update triggered');
    var arr = this.state.notes;
    arr[i]=newText;
    this.setState({notes:arr});
  };

  eachNote(note,i){
    return(
      <Note key={i} index={i} onNoteChange={this.update}>{note}</Note>
    );
  };

  render() {
    return (
      <div className="board">
        { this.state.notes.map(this.eachNote) }
      </div>
    );
  }
} …
Run Code Online (Sandbox Code Playgroud)

javascript reactjs

3
推荐指数
1
解决办法
2610
查看次数

kafka.server.KafkaServerStartable - java.lang.OutOfMemoryError:Java 堆空间

我正在尝试使用以下代码启动 Kafka 服务器:

public class MockKafkaServer {

    private static final String LOCALHOST = "127.0.0" + ".1";
    private static final int CONSUMER_TIMEOUT_MS = 5000;
    private static final int CONSUMER_BUFFER_SIZE = 64 * 1024;
    private static final int PRODUCER_SLEEP_INTERVAL = 100;

    private final KafkaServerStartable broker;
    private final MockZooKeeper mockZooKeeper;

    private KafkaProducer<byte[], byte[]> kafkaProducer;
    private SimpleConsumer simpleConsumer;

    private final int port;


    public MockKafkaServer() throws IOException, InterruptedException {

        this.mockZooKeeper = new MockZooKeeper();
        final int zkPort = mockZooKeeper.start();

        this.port = getAvailablePort();

        final File logDirectory = …
Run Code Online (Sandbox Code Playgroud)

java out-of-memory maven apache-kafka

3
推荐指数
1
解决办法
1295
查看次数

Spark结构化流中的Parquet数据和分区问题

我正在使用 Spark 结构化流;我的 DataFrame 具有以下架构

root 
 |-- data: struct (nullable = true) 
 |    |-- zoneId: string (nullable = true) 
 |    |-- deviceId: string (nullable = true) 
 |    |-- timeSinceLast: long (nullable = true) 
 |-- date: date (nullable = true) 
Run Code Online (Sandbox Code Playgroud)

如何使用 Parquet 格式执行 writeStream 并写入数据(包含 zoneId、deviceId、timeSinceLast;除日期之外的所有内容)并按日期分区数据?我尝试了以下代码,但分区子句不起作用

root 
 |-- data: struct (nullable = true) 
 |    |-- zoneId: string (nullable = true) 
 |    |-- deviceId: string (nullable = true) 
 |    |-- timeSinceLast: long (nullable = true) 
 |-- date: date (nullable = true) 
Run Code Online (Sandbox Code Playgroud)

apache-spark parquet spark-structured-streaming

2
推荐指数
1
解决办法
7344
查看次数