我在一个文件夹中有许多文件.我想删除一个文件后删除它.这两种方法有什么区别?哪种方法适用于我的场景?谢谢!
我有一个需要单元测试的生产者应用程序。我不想为此目的启动 Zookeeper 和 Kafka 服务器。有没有更简单的方法来使用 Mockito 测试它?
我有一个生产者正在为一个主题制作protobuf消息.我有一个消费者应用程序,它反序列化protobuf消息.但是hdfs sink连接器直接从Kafka主题中获取消息.将键和值转换器etc/schema-registry/connect-avro-standalone.properties设置为什么?最好的方法是什么?提前致谢!
我有一个有 40 个分区的主题。设置是这样的:
def on_assign (c,ps):
for p in ps:
p.offset=0
print ps
c.assign(ps)
conf = {'bootstrap.servers': 'localhost:9092'
'enable.auto.commit' : False,
'group.id' : 'confluent_consumer',
'default.topic.config': {'auto.offset.reset': 'earliest'}
}
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)
msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())
Run Code Online (Sandbox Code Playgroud)
我想从偏移量 0 读取主题的所有分区topic.source。但我没有看到所有分区都会发生这种情况。对于某些分区,它从特定的偏移量读取,我假设这是提交的偏移量,group.id每次更改也没有帮助。如何从头开始读取该主题的所有分区,而不考虑提交的偏移量?
我打印ps出来on_assign(),它为所有 40 个分区打印了这样的内容:
[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
Run Code Online (Sandbox Code Playgroud) 我想将分钟四舍五入到最接近的 15 分钟间隔,即 00、15、30、45。我目前正在执行以下操作:
echo $(date +'%Y/%m/%d/%H/')$((($(($(date +'%M') / 15))-0)*15))
Run Code Online (Sandbox Code Playgroud)
但在一小时开始的 1-14 分钟之间,我得到“/2021/11/03/21/0”而不是 00。
另外,我不确定这是否是最好的方法。还有其他选择吗?
我有一个大小为10240字节的文件.这是我获取文件大小的脚本:
fh = open(name, "r")
data = fh.read(10240)
print sys.getsizeof(data)
Run Code Online (Sandbox Code Playgroud)
输出:10277
我正在从文件中读取10240个字节但是当我打印大小时,我得到10277.如何确保我不读取超过10240个字节?我应该向我的消费者发送10240个字节,但看起来它正在发送更多.