标签: eventsource

Flask:如何检测无限响应生成器中的断开连接?

flask中,我有一个用于EventSource接收更新/事件的页面。

它以相当简单的方式实现:

@route('/updates')
def updates():
    def gen():
        while True:
            update = make_update()
            yield "data: {0}\n\n".format(json.dumps(update))
return Response(stream_with_context(gen()), mimetype="text/event-stream")
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是,每次我重新加载连接EventSource到“更新”页面的页面时,它都会创建一个新线程来满足该“更新”请求。而且它永远不会死

正在进行更新,因此这意味着它将数据推送到某个地方,从而使我的服务器使用越来越多的线程以及越来越多的内存。

我希望获得的简单解决方案是while True用某种形式的替换while is_connected()

但是我似乎找不到找到检测浏览器是否仍处于连接状态的方法。

问题:如何在发生器内部检查连接是否仍然有效?

编辑

浏览代码似乎应该调用close()generator,因此从理论上讲它应该GeneratorExit在my中的某个位置抛出gen()

但是,我看不到有任何发生的痕迹,并且每次调用时,我都会看到pstree在每次请求/连接后又产生一个条目/updates

python wsgi werkzeug flask eventsource

6
推荐指数
1
解决办法
237
查看次数

可以使用 Apache Kafka“无限保留策略”作为具有 CQRS 的事件源系统的基础吗?

我目前正在评估设计/实施事件溯源 + CQRS 系统设计架构方法的选项。由于我们想将 Apache Kafka 用于其他方面(正常的发布 -订阅消息 + 流处理),下一个合乎逻辑的问题是,“我们可以使用 Apache Kafka 存储作为 CQRS 的事件存储吗?”,或者更重要的是明智的决定?

现在我不确定这一点。这个来源似乎支持它:https : //www.confluent.io/blog/okay-store-data-apache-kafka/

这个其他来源建议反对:https : //medium.com/serialized-io/apache-kafka-is-not-for-event-sourcing-81735c3cf5c

在我目前的测试/实验中,我遇到了与第二个来源描述的问题类似的问题,这些问题是:

  1. 重构实体: Kafka 似乎不支持快速检索/搜索主题内的特定事件(例如:与订单历史相关的所有命令 - 重建实体实例所必需的,似乎需要扫描所有主题的事件并仅过滤那些与某些实体实例标识符匹配的事件,这是不行的)。[另一个人似乎得出了类似的结论:Query Kafka topic for specific record——也就是说,这是不可能的(不依赖一些hacky技巧)]
  2. - 写入一致性: Kafka 不支持其存储中的事务原子性,因此在将事件异步导出到 Kafka 队列之前,将具有某种锁定方法(通常是乐观锁定)的 DB 放入数据库似乎是一种常见的做法(我可以接受这个)不过,第一个问题对我来说更为重要)。
  3. 分区问题:在Kafka文档中,提到“顺序保证”,只存在于“主题的分区”中。同时他们也说分区是并行的基本单位,换句话说,如果你想并行化工作,把消息传播到分区(当然还有broker)。但这是一个问题,因为事件源系统中的“事件存储”需要订单保证,所以这意味着如果我绝对需要订单保证,我不得不在这个用例中只使用 1 个分区。这样对吗?

尽管这个问题有点开放,但实际上是这样的:您是否使用 Kafka 作为事件源系统上的主要事件存储?您如何处理从其命令历史记录中重组实体实例的问题(鉴于该主题有数百万个条目扫描所有集合不是一种选择)?您是否只使用了 1 个分区来牺牲潜在的并发消费者(假设订单保证仅限于特定主题分区)?

任何具体的或一般的反馈都将不胜感激,因为这是一个复杂的话题,有几个考虑因素。

提前致谢。

编辑 6 年前这里有一个类似的讨论: Using Kafka as a (CQRS) Eventstore。好主意? 当时的共识也存在分歧,很多人认为这种方法很方便,提到了 Kafka 如何在本地处理大量实时数据。尽管如此,问题(至少对我而言)与此无关,而是与 Kafka 重建实体状态的能力有多么不方便有关 - 要么通过将主题建模为实体实例(其中主题数量呈指数级增长是不希望的) ,或通过建模主题 es 实体类型(其中主题内的大量事件使重建非常缓慢/不切实际)。

cqrs event-sourcing apache-kafka eventsource apache-kafka-streams

6
推荐指数
1
解决办法
873
查看次数

服务器发送事件中的数据限制

我正在使用服务器发送的事件来对数据库执行查询。服务器实时流式传输统计数据stats和事件,当执行查询时,它会发送result带有数据的事件并关闭连接。

您可以使用以下curl命令对其进行测试:

curl 'http://rakam-production-webapp-203653584.us-east-1.aws.getrakam.com/query/execute?read_key=l2drg09t1j04poki16q46nsa7qvjdjhsedcml0e9m8sd87h6olkevq8b7m3m6948&data=%7B%22query%22%3A%22SELECT%20*%20FROM%20collection.%5C%22pull_request%5C%22%22%2C%22limit%22%3A1000%7D' -H 'Accept: text/event-stream'
Run Code Online (Sandbox Code Playgroud)

问题是数据超过几千字节时,EventSource 不会触发result事件。

http://jsbin.com/jiteca/edit?html,输出

如果数据不是那么大,它工作得很好:

http://jsbin.com/leginu/edit?html,输出

我尝试了 Chrome、Safari 和 Firefox,它们都遇到了这个问题。知道如何解决这个问题吗?这是服务器发送事件的限制吗,因为我在 W3 上没有找到有关服务器发送规范的任何内容?

html javascript server-sent-events eventsource

5
推荐指数
1
解决办法
1933
查看次数

服务器如何发送消息 SSE 在多个服务器实例环境中工作

我有一个关于如何使 SSE 在多个服务器环境中工作的问题。

在 UI 中,有两个步骤:

1. source = new EventSource('http://localhost:3000/stream');
   source.addEventListener('open', function(e) {
      $("#state").text("Connected")
    }, false);
Run Code Online (Sandbox Code Playgroud)
  1. UI 中的用户可以发布到 api 以更新数据

  2. 用户发布到 api 后,服务器将事件发送到 UI 以更新 UI

在一个服务器环境中,这工作得很好,完全没有问题。

但是在多服务器实例环境中,这将不起作用。例如,我有两个服务器实例,UI订阅了服务器1,然后服务器1记住了连接,但是数据更新来自服务器2,当数据发生变化时,服务器2中的SSE没有连接。然后在这个senario,服务器 2 如何将 SSE 发送到 UI?

为了让SSE在多服务器环境下工作,我们是否需要采取任何保存方案来保存连接信息,以便任何服务器实例都可以准确地将SSE发送到UI?

让我再澄清一下:是的,服务 1 和服务 2 都在负载均衡器之后,它们不必具有相同的 URL。UI 是纯前端应用程序,甚至可以是移动应用程序。那么,如果 UI 正在向 server1 的 LB 发送一个 eventSource 请求,那么只有这个实例可以使用这个连接将事件发送回 UI,对吗?但是如果我们有多个服务器 1 实例,这意味着除当前服务器之外的任何服务器 1 实例都不能将事件发送回 UI。我相信这是 SSE 的限制,除非连接可以在所有实例之间共享。但是如何。

谢谢

load-balancing eventsource spring-webflux

5
推荐指数
1
解决办法
2656
查看次数

Safari 服务器发送事件 (SSE) 无限循环

我在 Safari 9 和 Safari 10 中遇到服务器发送事件 (SSE) 问题。 SSE 连接打开,立即关闭,然后无限循环重新连接。

这是客户端代码:

var events = new EventSource("/stream/events")
Run Code Online (Sandbox Code Playgroud)

这些是 http 响应标头:

> GET /stream/events HTTP/1.1
> User-Agent: curl/7.43.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Access-Control-Allow-Origin: *
< Cache-Control: no-cache
< Connection: keep-alive
< Content-Type: text/event-stream
< Expires: Thu, 01 Jan 1970 00:00:00 GMT
< Last-Modified: Tue, 19 Sep 2017 05:28:22 GMT
< Strict-Transport-Security: max-age=31536000
< X-Accel-Buffering: no
< X-Content-Type-Options: nosniff
< X-Frame-Options: DENY
< X-Xss-Protection: 1; mode=block
< …
Run Code Online (Sandbox Code Playgroud)

javascript safari eventsource

5
推荐指数
1
解决办法
760
查看次数

如何使用 Firebase 在 IOS 上实现服务器发送事件?

我正在尝试使用rest api 监听firebase 事件。问题是未调用回调方法。我正在使用EventSource来实现此目的。这是侦听事件的正确方法吗?

Auth.auth().currentUser?.getIDTokenForcingRefresh(true, completion: { (token, error) in
    let server : String =  "https://project-XXXXX.firebaseio.com/.json?auth=\(token!)"

    let eventSource: EventSource = EventSource(url: server)
    eventSource.onOpen {
        // When opened
        debugPrint("eventSource open")
    }

    eventSource.onError { (error) in
        // When errors
        debugPrint("error = \(error?.localizedDescription)")
    }
    eventSource.onMessage { (id, event, data) in
        debugPrint("data = \(data)")
        // Here you get an event without event name!
    }

    eventSource.addEventListener("child_added") { (id, event, data) in
        debugPrint("data = \(data)")
        // Here you get an event 'event-name'
    }
})
Run Code Online (Sandbox Code Playgroud)

ios server-sent-events firebase firebase-realtime-database eventsource

5
推荐指数
1
解决办法
1715
查看次数

ServerSentEvents - Http 响应状态代码

如何捕获失败EventSource连接的 http 状态代码?

function onError(event) {
  console.log('request status code?');
}

const source = new EventSource('url', { withCredentials: true });
source.addEventListener('error', onError);
Run Code Online (Sandbox Code Playgroud)

javascript eventsource

5
推荐指数
0
解决办法
171
查看次数

检测 Flux 数据结束与错误

目前正在研究使用 Angular 5 和 Spring 5 webflux 的 SSE。基本应用程序工作正常,但在调查错误处理时,我们注意到,EventSource在角度应用程序中,由于到达 Flux 数据流的末尾而导致弹簧关闭连接与发生错误之间没有任何区别(例如,在传输过程中终止应用程序)。

我们调查所依据的示例如下。

https://thepracticaldeveloper.com/2017/11/04/full-reactive-stack-ii-the-angularjs-client/

http://javasampleapproach.com/reactive-programming/angular-4-spring-webflux-spring-data-reactive-mongodb-example-full-reactive-angular-4-http-client-spring-boot-restapi-server# 25_服务

get中的onerror和completion函数都会在EventSourceSpring成功发送数据并到达流末尾时调用,然后关闭连接,或者当我们ctrl+c应用程序中流时,或者当我们随机抛出异常时发送数据的中间。

EventSource论证仅包含{type: 'error'}所有 3 种情况。

eventsource spring-webflux angular5

5
推荐指数
1
解决办法
1287
查看次数

EventSource 重新连接时某些服务器发送的事件丢失

在我们的应用程序中,我们有 SSE 连接,生存时间为 5 分钟,5 分钟后服务器关闭连接,客户端自动重新连接。

\n\n

但这里的问题是:当客户端重新连接时,后端可能发生了一些事件,并且它不会传递到SSE连接,因为它\xe2\x80\x99s尚未建立。

\n\n

所以有一些 1-2 秒的时间段我们可能会丢失事件。

\n\n

我们该如何处理这个案子?你有什么意见 ?

\n\n

在我看来,我们只有一个选择:每次 SSE 重新连接后,在服务器上执行额外的 GET 请求来刷新数据。

\n

javascript server-sent-events eventsource

5
推荐指数
1
解决办法
6552
查看次数

EventSourcePolyfill 与原生 EventSource(性能方面)

我看到一个库允许使用HeaderEventSource 自定义标头。

事件源Polyfill

但是,当我看到网络选项卡时,Type显示的fetch不是 ,eventsource 而如果我使用本机,EventSource它会生成类型为eventsource

fetch和之间的性能有区别吗eventsource

使用 Polyfill

const eventSourceInitDict = {headers: {
        'ClientId': '123123123',
        'ClientSecret': 'lllasjdlkasjdlkasdlka'
    }};
const objEventSource1 = new EventSourcePolyfill('https://*****.herokuapp.com/emit-custom-pe?userId=' + '{!$User.Id}', eventSourceInitDict);
objEventSource1.addEventListener('message', function (objEventSourceData) {
    debugger;
});
Run Code Online (Sandbox Code Playgroud)

使用本机事件源

const objEventSource1 = new EventSource('https://****.herokuapp.com/emit-custom-pe?userId=' + '{!$User.Id}');
objEventSource1.addEventListener('message', function (objEventSourceData) {
    debugger;
});
Run Code Online (Sandbox Code Playgroud)

javascript performance server-sent-events eventsource

5
推荐指数
0
解决办法
1625
查看次数