所以我的雇主让 Avro 消息通过 Kafka。我们希望将所有消息存档到 Amazon S3。理想情况下,它们将在白天存储在类似 S3 的目录中,并使用类似于以下内容的路径结构:
s3://my-bucket/data/day=2016-03-04/data.avro
是否有关于如何做好这件事的参考或最佳实践?
我的一个问题是幂等性:我如何提供写入幂等性,在这种情况下,记录可能会多次发送到我的接收器写入器,但只能存储在 S3 上一次。
我是否正确,我需要幂等性?如果我实现一个简单的附加(非幂等),Kafka Connect 可能会发送相同的记录两次,并且它们可能会被冗余存储?
如何powerList在Haskell中编写函数,如下所示?我希望它使用n乘法运算来构建这样的列表,其中每个元素是前一个元素的简单倍数,而不是n指数运算.
理想情况下,实现是干净的,惯用的Haskell,并且相当有效.
-- powerList x n -> [1, x, x^2, ..., x^n]
-- For example:
-- powerList 2 0 -> [1]
-- powerList 2 1 -> [1, 2]
-- powerList 2 2 -> [1, 2, 4]
-- powerList 2 3 -> [1, 2, 4, 8]
-- powerList 2 4 -> [1, 2, 4, 8, 16]
powerList :: forall a. Integral a => a -> a -> [a]
powerList _ 0 = [1]
powerList x …Run Code Online (Sandbox Code Playgroud) 使用 Gradle 4.6,我做了一个超级简单的测试项目来测试 JUnit 5 的兼容性。这似乎-Dtest.single不起作用。我已经搜索了文档,我想运行一个测试类和/或一个测试方法。在这里,我期待
gradle clean -Dtest.single=junittest.SampleTests test
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':test'.
> Could not find matching test for pattern: junittest.SampleTests
Run Code Online (Sandbox Code Playgroud)
我有一个故意失败的测试用例,名为failingTest():
gradle clean test
> Task :test FAILED
junittest.SampleTests > failingTest() FAILED
org.opentest4j.AssertionFailedError at SampleTests.java:16
Run Code Online (Sandbox Code Playgroud)
我的 build.gradle:
apply plugin: 'java'
sourceCompatibility = 8
repositories {
mavenCentral()
maven { url "http://packages.confluent.io/maven/" }
}
test {
useJUnitPlatform()
}
tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:unchecked"
} …Run Code Online (Sandbox Code Playgroud) 如何将项目的依赖项复制到构建目录?
这是一个很常见的问题。我搜索并发现许多线程可以回答这个确切的问题,但没有一个解决方案有效。这里有三个线程(有些已经很旧了),它们提供了我无法工作的解决方案。
https://discuss.gradle.org/t/how-can-i-gather-all-my-projects-dependency-into-a-folder/7146
仅供参考,我已经尝试过:
task copyDependencies(type: Copy) {
from configurations.compile
into 'dependencies'
}
task copyDependencies2(type: Copy) {
from project.configurations.compile
into project.buildDir
}
project.copy {
from project.configurations.compile
into project.buildDir
}
Run Code Online (Sandbox Code Playgroud)
如果可能的话,我更喜欢当前推荐的最佳实践方法,而不是旧的已弃用的方法。我仍然使用当前的 Gradle,在撰写本文时当前版本为 4.7。
我在使用官方 Confluent Kafka Python API 时遇到错误:
我订阅:
kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
Run Code Online (Sandbox Code Playgroud)
使用回调:
def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))
for topic_partition in topic_partitions_with_offsets:
print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
topic_partition.offset, topic_partition.error))
Run Code Online (Sandbox Code Playgroud)
产生控制台输出:
without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
Run Code Online (Sandbox Code Playgroud)
有人可以解释一下吗?为什么我会收到未知分区的回调通知?类似的代码使用 Java API 可以完美运行。