标签: faust

如何将 Faust 与 Django 集成?

我正在尝试将 Faust 与 Django 集成以将消息发布到 Kafka。这是 Faust 仓库中的示例:https : //github.com/robinhood/faust/tree/master/examples/django

我稍微修改了一下,并创建了视图以通过 Faust 将数据推送到 Kafka。

from django.shortcuts import render

from asgiref.sync import async_to_sync

from accounts.agents import AccountRecord, add_account


async def send_data() -> None:
    print("sending..data")
    print(await add_account.ask(AccountRecord(name="tesst", score=10.9, active=False)))

def index(request):
    async_to_sync(send_data)()
    return render(request, "accounts/index.html")
Run Code Online (Sandbox Code Playgroud)

但是,我现在收到此错误:

RuntimeError at / Task <Task pending name='Task-1' coro=<AsyncToSync.main_wrap() running at /Users/mysuer/.pyenv/versions/3.8.3/envs/faustdjango/lib/python3.8/site-包/asgiref/sync.py:204> cb=[_run_until_complete_cb() at /Users/mysuer/.pyenv/versions/3.8.3/lib/python3.8/asyncio/base_events.py:184]> 将 Future 附加到不同的循环

我正在使用开发服务器运行这个 Django 应用程序。我究竟做错了什么?任何人?:)

python django faust django-3.0 django-3.1

17
推荐指数
1
解决办法
1124
查看次数

发布到 kafka 主题的 Faust 示例

我很好奇您应该如何表达您希望将消息快速传递到 Kafka 主题。他们自述文件中的示例似乎没有写入主题:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()
Run Code Online (Sandbox Code Playgroud)

我希望hello.send在上面的代码中向该主题发布一条消息,但似乎没有。

有很多从主题中读取的示例,以及许多使用 cli 推送 ad-hoc 消息的示例。梳理文档后,我没有看到任何明确的代码发布主题示例。我只是疯了,上面的代码应该可以工作吗?

python apache-kafka faust

11
推荐指数
2
解决办法
4411
查看次数

如何使用 Faust Python 包将 kafka 主题与 Web 端点连接?

我有一个简单的应用程序,有两个功能,一个用于收听主题,另一个用于 Web 端点。我想创建服务器端事件流 (SSE),即文本/事件流,以便在客户端我可以使用 EventSource 收听它。

我现在有以下代码,其中每个函数都在执行其特定的工作:

import faust

from faust.web import Response

app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")


@app.agent(test_topic)
async def test_topic_agent(stream):
    async for value in stream:
        print(f"test_topic_agent RECEIVED -- {value!r}")
        yield value


@app.page("/")
async def index(self, request):
    return self.text("yey")
Run Code Online (Sandbox Code Playgroud)

现在,我想要在索引中,像这样的代码,但使用 faust:

import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime


async def hello(request):
    loop = request.app.loop
    async with sse_response(request) as resp:
        while True:
            data = 'Server Time …
Run Code Online (Sandbox Code Playgroud)

python python-3.x apache-kafka aiohttp faust

10
推荐指数
1
解决办法
2093
查看次数

如何测试将数据发送到接收器的 Faust 代理?

我正在尝试使用 pytest 为我的 Faust 应用程序编写单元测试。我已经参考了此处的文档,但它没有提到当我的 Faust 代理将数据发送到接收器时要做什么。

如果没有水槽,我的测试工作正常,但是当我使用水槽时,我收到此错误:

RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop
INFO     faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...
Run Code Online (Sandbox Code Playgroud)

我尝试了各种方法:例如修补我的 Faust 应用程序中将数据发送到接收器的装饰器,尝试在没有装饰器的情况下测试我的函数(通过尝试绕过它),修补我的 Faust 应用程序中的接收器参数以有 None 值(因此它不会将我的数据发送到接收器)等。我对这些都没有运气。

这是我的浮士德经纪人:

app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json')

detection_topic = app.topic(dx_topic)
graph_topic = app.topic(gwh_topic)

@app.agent(detection_topic, sink=[graph_topic])
async def processDetections(detections):
    detection_count = 0
    async for detection in detections:
        detection_count += 1
        # r.set("detection_count", detection_count)
        yield detection
Run Code Online (Sandbox Code Playgroud)

这是我当前的测试代码:

import ml_exporter …
Run Code Online (Sandbox Code Playgroud)

python pytest python-3.x faust

6
推荐指数
1
解决办法
2349
查看次数

Faust 与 Kafka-python 之间的区别

我找不到任何答案:Faustkafka-python有什么区别?
选择其中任何一个有什么优点/缺点吗?
据我了解:

  • Kafka是用Java编写的,Kafka-python是一个与“Java流”通信的Python客户端
  • Faust是一个纯粹的“Python 流”

因此,如果我打算只使用 Python,那么 Faust 应该是更好的选择,如果我想要更广泛的兼容性(Go、.NET、C/C#、Java、Python),那么使用 Kafka + Kafka-python?

注意:我是 Kafka 的新手,我正在尝试了解不同解决方案的优缺点。

我将非常感谢任何建议!

python apache-kafka faust

6
推荐指数
1
解决办法
3300
查看次数

将 Flask 与 Faust 集成

我试图让浮士德代理在烧瓶视图/端点内投射消息,我找不到任何例子,我真的很挣扎。

有没有人成功尝试过这个?文档说使用 gevent 或 eventlet 作为 asyncio 的桥梁,但不幸的是无法理解

gevent flask eventlet python-3.6 faust

5
推荐指数
1
解决办法
1217
查看次数

如何在多个代理或faust计时器之间共享faust表?

我试图在一段时间间隔后将 faust 表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问​​表的数据。下面是定时器的代码:

@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
            await anomaly_topic.send(
            value=str(page_views['total'].value())
          )
@app.agent(page_view_topic)
async def count_page_views(views):
    async for view in views.group_by(PageView.id):
        total=0
        page_views[view.id]+=1
        for everykey in list(page_views.keys()):
            if everykey != 'total':
                total+=page_views[everykey].value()
        page_views['total'] = total
Run Code Online (Sandbox Code Playgroud)

代理工作正常。我能够正确地看到这些值。

python-3.x apache-kafka faust

5
推荐指数
1
解决办法
1855
查看次数

无法连接到 id 为 1 的节点:[Worker]:错误:ConnectionError('没有连接到 id 为节点')

我正在尝试使用 robinhood / faust 但没有成功!

我已经创建了一个生产者,它成功地插入到我的 confluence-kafka localhost 实例中的原始主题中!

但 faust 无法连接到本地主机。

我的应用程序.py:

import faust
import base64
import random
from datetime import datetime


SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"

app = faust.App("messages-stream", 
    broker="kafka://"+'localhost:9092',
    topic_partitions=1,
    store="memory://")

class OriginalMessage(faust.Record):
    msg: str


class TransformedMessage(faust.Record):
    msg_id: int
    msg_data: str
    msg_base64: str
    created_at: float 
    source_topic: str
    target_topic: str
    deleted: bool

topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)

table = app.Table(
    "output_msgs",
    default=TransformedMessage,
    partitions=1,
    changelog_topic=out_topic,
)

print('Initializing Thread Processor...')


@app.agent(topic)
async def transformedmessage(messageevents):
    async for transfmessage in messageevents:
        try: …
Run Code Online (Sandbox Code Playgroud)

python stream-processing faust

5
推荐指数
1
解决办法
5388
查看次数

如何从 faust 应用程序向 Websocket 发送数据

我目前正在使用 Kafka 和 robinhood 的 faust 处理一个用例来处理来自 Kafka 的数据。我已经成功地进行了计算,并且我需要的结果正在打印到我的 faust 工人正在运行的控制台上。

现在我想找到一种方法不仅可以在控制台中而且可以在 HTML 页面中看到我的结果。我查看了 websockets 库,但我无法将其与 faust 结合使用。我得到的错误是Crashed reason=RuntimeError('This event loop is already running')我认为这是因为代码是为正在处理的每条消息执行的。

任何帮助都受到高度赞赏

这是我正在使用的代码:

    import faust, datetime, websockets, asyncio

app = faust.App(
    'UseCase',
    broker='kafka://localhost:29092',
)

usecase_topic = app.topic('usecase',partitions=8)

usecase_table = app.Table('usecase', default=int)

checkfailure = {}

@app.agent(usecase_topic)
async def process_record(records):
    async for record in records:
        #count records for each Sensor
        print(record)
        sensor = record['ext_id']
        usecase_table[sensor] += 1
        #print(f'Records for Sensor {sensor}: {usecase_table[sensor]}')

        #write current timestamp of record …
Run Code Online (Sandbox Code Playgroud)

websocket python-3.x apache-kafka faust

2
推荐指数
1
解决办法
1216
查看次数