小编cri*_*007的帖子

kafka-connect-jdbc 不从源获取连续的时间戳

我使用 kafka-connect-jdbc-4.0.0.jar 和 postgresql-9.4-1206-jdbc41.jar

kafka connect的connector配置

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp",
  "timestamp.column.name": "updated_at",
  "topic.prefix": "streaming.data.v2",
  "connection.password": "password",
  "connection.user": "user",
  "schema.pattern": "test",
  "query": "select * from view_source",
  "connection.url": "jdbc:postgresql://host:5432/test?currentSchema=test"
}
Run Code Online (Sandbox Code Playgroud)

我已经使用 jdbc 驱动程序配置了两个连接器一个源和另一个接收器,针对 postgresql 数据库(“PostgreSQL 9.6.9”)一切正常

我对连接器如何收集源数据有疑问,查看日志我看到执行查询之间存在 21 秒的时间差

11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG Checking for next block of results from TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
11/1/2019 9:20:18[2019-01-11 08:20:18,985] DEBUG TimestampIncrementingTableQuerier{name='null', query='select * from view_source', topicPrefix='streaming.data.v2', timestampColumn='updated_at', incrementingColumn='null'} prepared SQL query: select * from view_source WHERE "updated_at" > …
Run Code Online (Sandbox Code Playgroud)

postgresql jdbc apache-kafka apache-kafka-connect confluent-platform

1
推荐指数
1
解决办法
1550
查看次数

无法连接 Kafka 架构注册表中的 localhost:8081

我已经使用以下命令启动了 Zookeeper 和 Kafka:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)

当我尝试获取架构注册表兼容性设置(向后、向前、无)时,我运行了以下curl命令:

curl -X GET http://localhost:8081/config
Run Code Online (Sandbox Code Playgroud)

预期的:

{"compatibility":"BACKWARD"}
Run Code Online (Sandbox Code Playgroud)

结果:

curl: (7) Failed to connect to localhost port 8081: Connection refused
Run Code Online (Sandbox Code Playgroud)

我怎样才能找到要使用哪个端口?

apache-kafka confluent-schema-registry confluent-platform

1
推荐指数
1
解决办法
4922
查看次数

谓词下推到底是如何工作的?

谁能用例子解释一下谓​​词下推到底是如何工作的?

hadoop parquet orc

1
推荐指数
1
解决办法
1722
查看次数

在 Python 中从 JSON 构建表

我正在尝试使用 Python 将 JSON 文本转换为标准数据表,但是我对此几乎没有经验,当我在线搜索解决方案时,我发现我很难实现任何解决方案。

我试图使用ast.literal_eval但一直收到我无法解决的错误。

raise ValueError('畸形节点或字符串:' + repr(node))

JSON:

{
    "duration": 202.0,
    "session_info": {
        "activation_uuid": "ab90d941-df9d-42c5-af81-069eb4f71515",
        "launch_uuid": "11101c41-2d79-42cc-bf6d-37be46802fc8"
    },
    "timestamp": "2019-01-18T11:11:26.135Z",
    "source_page_view_reference": {
        "page_uuid": "1bede017-7b77-461d-82ef-a6bbcfdae4d7",
        "page_id": "/group/More",
        "page_name": "More",
        "view_uuid": "9580f3c5-1116-432a-83bc-9d0b5337f661",
        "page_type": "Native"
    },
    "analytics_sdk": {
        "component_id": "datasdk",
        "component_version": "1.0.52"
    },
    "treatment_id": "mockTreat",
    "client_event_id": "2b3cd878-6932-410b-b1ad-bc40ae888fdc",
    "campaign_id": "mockCamp"
}
Run Code Online (Sandbox Code Playgroud)

所需的表格格式(修剪值以适合显示目的):

Duration | session_info.activation_uuid | session_info.launch_uuid | timestamp  | etc
   202.0 |  ab90d941-df9d-42c5-af81-069 | 11101c41-2d79-42cc-bf6d- | 2019-01-18 | etc
Run Code Online (Sandbox Code Playgroud)

任何直接的帮助,或者只是学习这方面的良好资源,将不胜感激。我很难找到直接与我想要从一系列类似的 JSON 中创建表格的内容相关的项目。

python json etl

1
推荐指数
1
解决办法
1万
查看次数

如何使用 kafka 扩展 Zookeeper

我正在致力于在 Prod 中扩展 kafka 集群。Confluence 提供了添加 kafka 代理的简单方法。但是,我如何知道如何与 Kafka 一起扩展 Zookeeper。比例应该是多少?现在我们有 5 个 Zookeeper 节点用于 5 个 kafka 代理。如果我有 10 个 kafka 代理,应该有多少个 Zookeeper 节点?

apache-kafka apache-zookeeper

1
推荐指数
1
解决办法
2061
查看次数

Python Producer 可以通过 shell 发送,但不能通过 .py

我有一个正在运行并经过测试的 Kafka 集群,我正在尝试使用 Python 脚本向代理发送消息。这在我使用 Python3 shell 并调用生产者方法时有效,但是当我将这些相同的命令放入 python 文件并执行它时 - 脚本似乎挂起。

我正在为消费者和生产者使用 kafka-python 库。当我使用 Python3 shell 时,我可以看到使用 Kafka GUI 工具 2.0.4 的主题中出现的消息我在 python 代码中尝试了各种循环和语句,但似乎没有任何东西让它“运行”完成。

>>>from kafka import KafkaProducer
>>>producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
>>>producer.send('MyTopic', b'Has this worked?')
>>>>>><kafka.producer.future.FutureRecordMetadata object at 0x7f7af9ece048>
Run Code Online (Sandbox Code Playgroud)

这有效并且字节出现在代理主题数据中。

当我将与上面相同的代码放在 python .py 文件中并使用 Python3 执行时,它会完成,但没有数据发送到 Kafka 代理。也没有显示错误。

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='BOOTSTRAP_SRV:9092')
producer.send('MyTopic', b'Some Data to Check')
Run Code Online (Sandbox Code Playgroud)

python python-3.x apache-kafka kafka-python

1
推荐指数
1
解决办法
1184
查看次数

如何在类似于numpy的kotlin中自动生成数组?

你好,让我用python演示一下我想在kotlin中实现的目标:

np.linspace(start = 0, stop = 100, num = 5)

Run Code Online (Sandbox Code Playgroud)

结果:

-------------------------
|0 | 25 | 50 | 75 | 100 |
-------------------------
Run Code Online (Sandbox Code Playgroud)

现在在科特林我如何能得到相同的结果?有类似的图书馆吗?

kotlin

1
推荐指数
1
解决办法
159
查看次数

将Lambda表达式转换为简单函数

我很好奇在以下情况下是否有可能避免使用lambda表达式。

例如,使用lambda表达式,我可以简单地定义一个返回lambda exp的函数。

def square_fun(a,b,c):
   return lambda x: a*x**2 + b*x + c
Run Code Online (Sandbox Code Playgroud)

之后我们可以使用以下命令调用它:

f = square_fun(1,2,3)

f(1) # here x = 1
Run Code Online (Sandbox Code Playgroud)

如何获得避免lambda表达式的相同行为?

例如,square_fun必须返回另一个函数

def square_fun(a,b,c):
   return f...
Run Code Online (Sandbox Code Playgroud)

python lambda python-3.x

1
推荐指数
1
解决办法
50
查看次数

Apache Spark 中describe() 和summary() 的区别

summary()和 和有describe()什么区别?似乎它们都具有相同的目的,但没有设法找到任何差异(如果有的话)。

apache-spark

1
推荐指数
1
解决办法
2245
查看次数

Confluent Kafka Golang 客户端生产者“代理:没有足够的同步副本”

我正在尝试使用 Golang 客户端测试生产者将消息写入 kafka 集群上的主题。这可以很好地写入本地集群上的主题,我只是从他们的github repo复制并粘贴了示例代码。

package main

import (
    "fmt"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)


func main() {

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost"})
    if err != nil {
        panic(err)
    }

    defer p.Close()

    // Delivery report handler for produced messages
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic …
Run Code Online (Sandbox Code Playgroud)

go apache-kafka confluent-platform

1
推荐指数
1
解决办法
4397
查看次数