小编Sne*_*ngh的帖子

使用独立模式 Kafka-connect 将数据捕获从 Postgres SQL 更改为 kafka 主题

我一直在尝试使用以下命令 /bin connect-standalone.properties config/connect-standalone.properties postgres.sproperties 从 postgres sql 到 kafka 主题获取数据,但我面临着几个问题,这里是我的 postgres 的内容。属性文件:

name=cdc_demo
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=decoderbufs
slot.name=debezium
slot.drop_on_stop=false
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=XXXXX
database.dbname=snehildb
time.precision.mode=adaptive
database.sslmode=disable
database.server.name=localhost:5432/snehildb
table.whitelist=public.students
decimal.handling.mode=precise
topic.creation.enable=true`
Run Code Online (Sandbox Code Playgroud)

以下是 connect-standalone.properties 的内容:

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every 
Connect user will
# need to configure these based on the format they want …
Run Code Online (Sandbox Code Playgroud)

postgresql change-data-capture apache-kafka apache-kafka-connect debezium

7
推荐指数
1
解决办法
6631
查看次数

如何从 Google Cloud Composer 调用云函数?

对于一个要求,我想从云作曲家管道内部调用/调用云函数,但我找不到太多相关信息,我尝试使用 SimpleHTTP 气流运算符,但收到此错误:

[2021-09-10 10:35:46,649] {taskinstance.py:1503} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1158, in 
_run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1333, in 
_prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1363, in 
_execute_task
result = task_copy.execute(context=context)
File "/home/airflow/gcs/dags/to_gcf.py", line 51, in execute
if not self.response_check(response):
File "/home/airflow/gcs/dags/to_gcf.py", line 83, in <lambda>
response_check=lambda response: False if len(response.json()) == 0 else True,
File "/opt/python3.8/lib/python3.8/site-packages/requests/models.py", line 900, in json
return complexjson.loads(self.text, **kwargs)
File "/opt/python3.8/lib/python3.8/json/__init__.py", line …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform airflow google-cloud-functions google-cloud-composer

5
推荐指数
1
解决办法
5257
查看次数