我正在尝试将 Faust 与 Django 集成以将消息发布到 Kafka。这是 Faust 仓库中的示例:https : //github.com/robinhood/faust/tree/master/examples/django
我稍微修改了一下,并创建了视图以通过 Faust 将数据推送到 Kafka。
from django.shortcuts import render
from asgiref.sync import async_to_sync
from accounts.agents import AccountRecord, add_account
async def send_data() -> None:
print("sending..data")
print(await add_account.ask(AccountRecord(name="tesst", score=10.9, active=False)))
def index(request):
async_to_sync(send_data)()
return render(request, "accounts/index.html")
Run Code Online (Sandbox Code Playgroud)
但是,我现在收到此错误:
RuntimeError at / Task <Task pending name='Task-1' coro=<AsyncToSync.main_wrap() running at /Users/mysuer/.pyenv/versions/3.8.3/envs/faustdjango/lib/python3.8/site-包/asgiref/sync.py:204> cb=[_run_until_complete_cb() at /Users/mysuer/.pyenv/versions/3.8.3/lib/python3.8/asyncio/base_events.py:184]> 将 Future 附加到不同的循环
我正在使用开发服务器运行这个 Django 应用程序。我究竟做错了什么?任何人?:)
我很好奇您应该如何表达您希望将消息快速传递到 Kafka 主题。他们自述文件中的示例似乎没有写入主题:
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
if __name__ == '__main__':
app.main()
Run Code Online (Sandbox Code Playgroud)
我希望hello.send在上面的代码中向该主题发布一条消息,但似乎没有。
有很多从主题中读取的示例,以及许多使用 cli 推送 ad-hoc 消息的示例。梳理文档后,我没有看到任何明确的代码发布主题示例。我只是疯了,上面的代码应该可以工作吗?
我有一个简单的应用程序,有两个功能,一个用于收听主题,另一个用于 Web 端点。我想创建服务器端事件流 (SSE),即文本/事件流,以便在客户端我可以使用 EventSource 收听它。
我现在有以下代码,其中每个函数都在执行其特定的工作:
import faust
from faust.web import Response
app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
print(f"test_topic_agent RECEIVED -- {value!r}")
yield value
@app.page("/")
async def index(self, request):
return self.text("yey")
Run Code Online (Sandbox Code Playgroud)
现在,我想要在索引中,像这样的代码,但使用 faust:
import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime
async def hello(request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = 'Server Time …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 pytest 为我的 Faust 应用程序编写单元测试。我已经参考了此处的文档,但它没有提到当我的 Faust 代理将数据发送到接收器时要做什么。
如果没有水槽,我的测试工作正常,但是当我使用水槽时,我收到此错误:
RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop
INFO faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...
Run Code Online (Sandbox Code Playgroud)
我尝试了各种方法:例如修补我的 Faust 应用程序中将数据发送到接收器的装饰器,尝试在没有装饰器的情况下测试我的函数(通过尝试绕过它),修补我的 Faust 应用程序中的接收器参数以有 None 值(因此它不会将我的数据发送到接收器)等。我对这些都没有运气。
这是我的浮士德经纪人:
app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json')
detection_topic = app.topic(dx_topic)
graph_topic = app.topic(gwh_topic)
@app.agent(detection_topic, sink=[graph_topic])
async def processDetections(detections):
detection_count = 0
async for detection in detections:
detection_count += 1
# r.set("detection_count", detection_count)
yield detection
Run Code Online (Sandbox Code Playgroud)
这是我当前的测试代码:
import ml_exporter …Run Code Online (Sandbox Code Playgroud) 我找不到任何答案:Faust和kafka-python有什么区别?
选择其中任何一个有什么优点/缺点吗?
据我了解:
因此,如果我打算只使用 Python,那么 Faust 应该是更好的选择,如果我想要更广泛的兼容性(Go、.NET、C/C#、Java、Python),那么使用 Kafka + Kafka-python?
注意:我是 Kafka 的新手,我正在尝试了解不同解决方案的优缺点。
我将非常感谢任何建议!
我试图让浮士德代理在烧瓶视图/端点内投射消息,我找不到任何例子,我真的很挣扎。
有没有人成功尝试过这个?文档说使用 gevent 或 eventlet 作为 asyncio 的桥梁,但不幸的是无法理解
我试图在一段时间间隔后将 faust 表的数据(计数)发布到 kafka 主题。当我发布一些简单的字符串时,计时器正在工作,但它无法以某种方式访问表的数据。下面是定时器的代码:
@app.timer(interval=10.0)
async def publish_to_anomaly_topic():
await anomaly_topic.send(
value=str(page_views['total'].value())
)
@app.agent(page_view_topic)
async def count_page_views(views):
async for view in views.group_by(PageView.id):
total=0
page_views[view.id]+=1
for everykey in list(page_views.keys()):
if everykey != 'total':
total+=page_views[everykey].value()
page_views['total'] = total
Run Code Online (Sandbox Code Playgroud)
代理工作正常。我能够正确地看到这些值。
我正在尝试使用 robinhood / faust 但没有成功!
我已经创建了一个生产者,它成功地插入到我的 confluence-kafka localhost 实例中的原始主题中!
但 faust 无法连接到本地主机。
我的应用程序.py:
import faust
import base64
import random
from datetime import datetime
SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"
app = faust.App("messages-stream",
broker="kafka://"+'localhost:9092',
topic_partitions=1,
store="memory://")
class OriginalMessage(faust.Record):
msg: str
class TransformedMessage(faust.Record):
msg_id: int
msg_data: str
msg_base64: str
created_at: float
source_topic: str
target_topic: str
deleted: bool
topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)
table = app.Table(
"output_msgs",
default=TransformedMessage,
partitions=1,
changelog_topic=out_topic,
)
print('Initializing Thread Processor...')
@app.agent(topic)
async def transformedmessage(messageevents):
async for transfmessage in messageevents:
try: …Run Code Online (Sandbox Code Playgroud) 我目前正在使用 Kafka 和 robinhood 的 faust 处理一个用例来处理来自 Kafka 的数据。我已经成功地进行了计算,并且我需要的结果正在打印到我的 faust 工人正在运行的控制台上。
现在我想找到一种方法不仅可以在控制台中而且可以在 HTML 页面中看到我的结果。我查看了 websockets 库,但我无法将其与 faust 结合使用。我得到的错误是Crashed reason=RuntimeError('This event loop is already running')我认为这是因为代码是为正在处理的每条消息执行的。
任何帮助都受到高度赞赏
这是我正在使用的代码:
import faust, datetime, websockets, asyncio
app = faust.App(
'UseCase',
broker='kafka://localhost:29092',
)
usecase_topic = app.topic('usecase',partitions=8)
usecase_table = app.Table('usecase', default=int)
checkfailure = {}
@app.agent(usecase_topic)
async def process_record(records):
async for record in records:
#count records for each Sensor
print(record)
sensor = record['ext_id']
usecase_table[sensor] += 1
#print(f'Records for Sensor {sensor}: {usecase_table[sensor]}')
#write current timestamp of record …Run Code Online (Sandbox Code Playgroud) faust ×9
python ×6
apache-kafka ×5
python-3.x ×4
aiohttp ×1
django ×1
django-3.0 ×1
django-3.1 ×1
eventlet ×1
flask ×1
gevent ×1
pytest ×1
python-3.6 ×1
websocket ×1