小编Har*_*lar的帖子

Flink 计划中的 Hash、Forward 等术语是什么意思?

这是我部署作业时出现在仪表板上的 Flink 计划的图像。如您所见,运算符之间的连接被标记为FORWARD/HASH等。它们指的是什么?什么时候叫a HASH,什么时候叫a FORWARD

在此处输入图片说明

apache-flink flink-streaming

4
推荐指数
2
解决办法
985
查看次数

当函数在Flask的后台运行时,如何向页面添加加载gif?

我编写了一个显示带有按钮的页面的Flask应用程序,当用户点击该按钮时,调用Python函数进行一些处理(通常需要大约2分钟).此处理结束后,将打开一个显示结果的新页面.现在我想要包含一个'loading gif',指示用户正在进行处理.我该怎么做呢?这是我的代码看起来像的一个例子.

app.py

from flask import Flask, render_template
 import time

 app = Flask(__name__)

 @app.route('/')
 def index():
     return render_template('index.html')

 @app.route('/result', methods=['POST'])
 def result():
     time.sleep(5) # indicates the time delay caused due to processing
     return render_template('result.html')

 if __name__ == '__main__':
     app.run(debug=True)
Run Code Online (Sandbox Code Playgroud)

的index.html

<html>
    <body>
        <form method=post action="/result">
            <input type=submit value='Start Processing' name='submit_btn'>
        </form>
    </body>
</html>
Run Code Online (Sandbox Code Playgroud)

result.html

<html>
    <body>
        <h1> Processing Done </h1>
    </body>
</html>
Run Code Online (Sandbox Code Playgroud)

html javascript python flask

1
推荐指数
1
解决办法
4183
查看次数

KeyBy不会为不同的键创建不同的键流

我在读一个简单的JSON字符串作为输入和键控基于两个领域流AB。但KeyBy产生了对不同值的相同键合流B,但为的特定组合AB

输入:

{
    "A": "352580084349898",
    "B": "1546559127",
    "C": "A"
}
Run Code Online (Sandbox Code Playgroud)

这是我的Flink代码的核心逻辑:

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
            .map(new MapFunction<String, GenericDataObject>() {
                @Override
                public GenericDataObject map(String s) throws Exception {
                    JSONObject jsonObject = new JSONObject(s);
                    GenericDataObject genericDataObject = new GenericDataObject();
                    genericDataObject.setA(jsonObject.getString("A"));
                    genericDataObject.setB(jsonObject.getString("B"));
                    genericDataObject.setC(jsonObject.getString("C"));
                    return genericDataObject;
                }
            });
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
            .keyBy("A", "B")
            .map(new MapFunction<GenericDataObject, GenericDataObject>() {
                @Override
                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
                    return genericDataObject;
                }
            });
testStream.print();
Run Code Online (Sandbox Code Playgroud)

GenericDataObject是一个POJO,具有三个字段A, …

apache-flink flink-streaming

0
推荐指数
1
解决办法
329
查看次数

随着请求数量的增加,Go Web服务器的性能急剧下降

我正在基准测试使用Go语言编写的简单Web服务器wrk。服务器正在具有4GB RAM的计算机上运行。在测试开始时,该代码每秒可处理2000个请求,因此性能非常好。但是随着时间的流逝,该进程使用的内存会逐渐增加,一旦达到85%(我正在使用进行检查top),吞吐量就会下降到约100个请求/秒。重新启动服务器后,吞吐量再次增加到最佳数量。

是由于内存问题导致性能下降吗?Go为什么不释放此内存?我的Go服务器如下所示:

func main() {
    defer func() {
        // Wait for all messages to drain out before closing the producer
        p.Flush(1000)
        p.Close()
    }()

    http.HandleFunc("/endpoint", handler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}
Run Code Online (Sandbox Code Playgroud)

在处理程序中,我将传入的Protobuf消息转换为Json并使用融合的Kafka Go库将其写入Kafka。

var p, err = kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
    "message.timeout.ms": "30000",
    "sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
    "sasl.kerberos.principal": "TEST@TEST.ABC.COM",
    "sasl.kerberos.service.name": "kafka",
    "security.protocol": "SASL_PLAINTEXT",
})

var topic = "test"

func handler(w http.ResponseWriter, r *http.Request) {
    body, _ := ioutil.ReadAll(r.Body)

    // Deserialize byte[] to Protobuf message
    protoMessage := &tutorial.REALTIMEGPS{}
    _ := proto.Unmarshal(body, protoMessage) …
Run Code Online (Sandbox Code Playgroud)

webserver go confluent-kafka

-1
推荐指数
1
解决办法
79
查看次数