我使用 kafka-connect-jdbc-4.0.0.jar 和 postgresql-9.4-1206-jdbc41.jar
kafka connect的connector配置
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "updated_at",
"topic.prefix": "streaming.data.v2",
"connection.password": "password",
"connection.user": "user",
"schema.pattern": "test",
"query": "select * from view_source",
"connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}
Run Code Online (Sandbox Code Playgroud)
我已经使用 jdbc 驱动程序配置了两个连接器一个源和另一个接收器,针对 postgresql 数据库(“PostgreSQL 9.6.9”)一切正常
我对连接器如何收集源数据有疑问,查看日志我看到执行查询之间存在 21 秒的时间差
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > …
Run Code Online (Sandbox Code Playgroud) postgresql jdbc apache-kafka apache-kafka-connect confluent-platform
我已经使用以下命令启动了 Zookeeper 和 Kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)
当我尝试获取架构注册表兼容性设置(向后、向前、无)时,我运行了以下curl命令:
curl -X GET http://localhost:8081/config
Run Code Online (Sandbox Code Playgroud)
预期的:
{"compatibility":"BACKWARD"}
Run Code Online (Sandbox Code Playgroud)
结果:
curl: (7) Failed to connect to localhost port 8081: Connection refused
Run Code Online (Sandbox Code Playgroud)
我怎样才能找到要使用哪个端口?
我正在尝试使用 Python 将 JSON 文本转换为标准数据表,但是我对此几乎没有经验,当我在线搜索解决方案时,我发现我很难实现任何解决方案。
我试图使用ast.literal_eval
但一直收到我无法解决的错误。
raise ValueError('畸形节点或字符串:' + repr(node))
JSON:
{
"duration": 202.0,
"session_info": {
"activation_uuid": "ab90d941-df9d-42c5-af81-069eb4f71515",
"launch_uuid": "11101c41-2d79-42cc-bf6d-37be46802fc8"
},
"timestamp": "2019-01-18T11:11:26.135Z",
"source_page_view_reference": {
"page_uuid": "1bede017-7b77-461d-82ef-a6bbcfdae4d7",
"page_id": "/group/More",
"page_name": "More",
"view_uuid": "9580f3c5-1116-432a-83bc-9d0b5337f661",
"page_type": "Native"
},
"analytics_sdk": {
"component_id": "datasdk",
"component_version": "1.0.52"
},
"treatment_id": "mockTreat",
"client_event_id": "2b3cd878-6932-410b-b1ad-bc40ae888fdc",
"campaign_id": "mockCamp"
}
Run Code Online (Sandbox Code Playgroud)
所需的表格格式(修剪值以适合显示目的):
Duration | session_info.activation_uuid | session_info.launch_uuid | timestamp | etc
202.0 | ab90d941-df9d-42c5-af81-069 | 11101c41-2d79-42cc-bf6d- | 2019-01-18 | etc
Run Code Online (Sandbox Code Playgroud)
任何直接的帮助,或者只是学习这方面的良好资源,将不胜感激。我很难找到直接与我想要从一系列类似的 JSON 中创建表格的内容相关的项目。
我正在致力于在 Prod 中扩展 kafka 集群。Confluence 提供了添加 kafka 代理的简单方法。但是,我如何知道如何与 Kafka 一起扩展 Zookeeper。比例应该是多少?现在我们有 5 个 Zookeeper 节点用于 5 个 kafka 代理。如果我有 10 个 kafka 代理,应该有多少个 Zookeeper 节点?
我有一个正在运行并经过测试的 Kafka 集群,我正在尝试使用 Python 脚本向代理发送消息。这在我使用 Python3 shell 并调用生产者方法时有效,但是当我将这些相同的命令放入 python 文件并执行它时 - 脚本似乎挂起。
我正在为消费者和生产者使用 kafka-python 库。当我使用 Python3 shell 时,我可以看到使用 Kafka GUI 工具 2.0.4 的主题中出现的消息我在 python 代码中尝试了各种循环和语句,但似乎没有任何东西让它“运行”完成。
>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
Run Code Online (Sandbox Code Playgroud)
这有效并且字节出现在代理主题数据中。
当我将与上面相同的代码放在 python .py 文件中并使用 Python3 执行时,它会完成,但没有数据发送到 Kafka 代理。也没有显示错误。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
Run Code Online (Sandbox Code Playgroud) 你好,让我用python演示一下我想在kotlin中实现的目标:
np.linspace(start = 0, stop = 100, num = 5)
Run Code Online (Sandbox Code Playgroud)
结果:
-------------------------
|0 | 25 | 50 | 75 | 100 |
-------------------------
Run Code Online (Sandbox Code Playgroud)
现在在科特林我如何能得到相同的结果?有类似的图书馆吗?
我很好奇在以下情况下是否有可能避免使用lambda表达式。
例如,使用lambda表达式,我可以简单地定义一个返回lambda exp的函数。
def square_fun(a,b,c):
return lambda x: a*x**2 + b*x + c
Run Code Online (Sandbox Code Playgroud)
之后我们可以使用以下命令调用它:
f = square_fun(1,2,3)
f(1) # here x = 1
Run Code Online (Sandbox Code Playgroud)
如何获得避免lambda表达式的相同行为?
例如,square_fun必须返回另一个函数
def square_fun(a,b,c):
return f...
Run Code Online (Sandbox Code Playgroud) summary()
和 和有describe()
什么区别?似乎它们都具有相同的目的,但没有设法找到任何差异(如果有的话)。
我正在尝试使用 Golang 客户端测试生产者将消息写入 kafka 集群上的主题。这可以很好地写入本地集群上的主题,我只是从他们的github repo复制并粘贴了示例代码。
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic …
Run Code Online (Sandbox Code Playgroud) apache-kafka ×5
python ×3
python-3.x ×2
apache-spark ×1
etl ×1
go ×1
hadoop ×1
jdbc ×1
json ×1
kafka-python ×1
kotlin ×1
lambda ×1
orc ×1
parquet ×1
postgresql ×1