我在Scala中创建了一个case对象层次结构,如下所示:
package my.awesome.package
sealed abstract class PresetShapeType(val displayName: String)
case object AccelerationSensor extends PresetShapeType("Acceleration Sensor")
case object DisplacementSensor extends PresetShapeType("Displacement Sensor")
case object ForceSensor extends PresetShapeType("Force Sensor")
case object PressureSensor extends PresetShapeType("Pressure Sensor")
case object StrainSensor extends PresetShapeType("Strain Sensor")
Run Code Online (Sandbox Code Playgroud)
我还有一段我想要访问的Java代码PressureSensor,但以下内容不起作用:
package my.awesome.package.subpackage;
import my.awesome.package.PressureSensor;
// Do some stuff, then...
DVShape newshape = DVShapeFactory.createPresetShape(PressureSensor, new Point3f(0,0,0));
Run Code Online (Sandbox Code Playgroud)
那么,我如何PressureSensor从Java 引用case对象?我反编译了PressureSensor和PressureSensor$类的字节码,产生了以下内容:
Compiled from "DVShapeFactory.scala"
public final class org.nees.rpi.vis.PressureSensor extends java.lang.Object{
public static final java.lang.Object productElement(int);
public …Run Code Online (Sandbox Code Playgroud) 在卡夫卡0.8beta可以使用类似下面的命令提到要创建的主题在这里
bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 2 --partition 3 --topic test
Run Code Online (Sandbox Code Playgroud)
上面的命令将创建一个名为"test"的主题,每个分区有3个分区和2个副本.
我可以使用Java做同样的事情吗?
到目前为止,我发现使用Java我们可以创建一个生产者,如下所示
Producer<String, String> producer = new Producer<String, String>(config);
producer.send(new KeyedMessage<String, String>("mytopic", msg));
Run Code Online (Sandbox Code Playgroud)
这将创建一个名为"mytopic"的主题,其中包含使用"num.partitions"属性指定的分区数并开始生成.
但有没有办法定义分区和复制呢?我找不到任何这样的例子.如果我们不能这样做意味着我们总是需要先创建包含分区和复制的主题(根据我们的要求),然后使用生产者在该主题中生成消息.例如,如果我想以相同的方式创建"mytopic"但是具有不同的分区数(覆盖num.partitions属性)是可能的吗?
我正在尝试使用以下代码Source通过AdminCommand创建一个kafka主题
ZkClient zkClient = new ZkClient(kafkaHost, 10000, 10000, ZKStringSerializer$.MODULE$);
AdminUtils.createTopic(zkClient, "pa_reliancepoc_telecom_usageevent", 10, 2, new Properties());
Run Code Online (Sandbox Code Playgroud)
但得到以下例外
Exception in thread "main" kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)
Run Code Online (Sandbox Code Playgroud)
但是,我可以使用shell命令创建主题.
我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅检查套接字连接)。我知道 Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西,有人有使用它的经验或例子吗?
\n public class KafkaHealthIndicator implements HealthIndicator {\n private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);\n\n private KafkaTemplate<String, String> kafka;\n\n public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {\n this.kafka = kafka;\n }\n\n @Override\n public Health health() {\n try {\n kafka.send("kafka-health-indicator", "\xe2\x9d\xa5").get(100, TimeUnit.MILLISECONDS);\n } catch (InterruptedException | ExecutionException | TimeoutException e) {\n return Health.down(e).build();\n }\n return Health.up().build();\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n
在为Apache Kafka创建主题时,什么是最佳实践?
每个人都允许自动创建主题,或者您是如何做到的?您是否将主题创建步骤与kafka-instance的开头捆绑在一起?
我有一个基于docker的Kafka安装,它已被多个应用程序使用.如何将每个应用程序的主题创建与Kafka容器的启动分开?看看Confluents音乐演示他们通过旋转一个新的kafka图像来创建主题,调用"create-topic-script"然后让容器死掉.这感觉"abcky",但maby是唯一的方式?
问候
我正在使用 testcontainers.org 和KafkaContainer.
目前,我用来kafka-topics在启动容器后创建一个主题:
kafkaContainer.execInContainer("/bin/sh", "-c", "/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topicName");
Run Code Online (Sandbox Code Playgroud)
由于每个主题大约需要 3-5 秒,我想知道是否有更有效的方法来创建多个主题。或者是否有一个简单的开关来按需自动创建主题?
在搜索如何通过API创建Kafka主题时,我在Scala中找到了这个示例:
import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs,
connectionTimeoutMs, ZKStringSerializer)
// Create a topic with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName,
numPartitions, replicationFactor, topicConfig)
Run Code Online (Sandbox Code Playgroud)
资料来源:https://stackoverflow.com/a/23360100/871012
最后一个arg ZKStringSerializer显然是一个Scala对象.我不清楚如何使这个例子在Java中工作.
这篇文章如何在clojure中创建一个scala对象在Clojure中提出同样的问题,答案是:
ZKStringSerializer$/MODULE$
Run Code Online (Sandbox Code Playgroud)
在Java中我会(我认为)转换为:
ZKStringSerializer$.MODULE$
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试(或任何其他数量的变化)时,他们都没有编译.
编译错误是:
KafkaTopicCreator.java:[16,18] cannot …Run Code Online (Sandbox Code Playgroud) 我正在编写一个客户端来使用kafka 0.9.我想知道如何创建一个主题.答案:如何通过Java在Kafka中创建主题与我的要求类似.除此之外,该解决方案仅适用于Kafka 0.8.2,这与Kafka 0.9的API有很大不同.
我最近升级到了 Kafka 1.1.0。我正在尝试为 kafka 消费者创建单元测试。为此,如果单元测试可以创建用于测试的主题,那将是理想的选择。我发现一些代码看起来应该可以实现我想要的功能。但是,当我运行它时,它会抛出异常: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.closeQuietly(Ljava/lang/AutoCloseable;Ljava/lang/String;)V
这是我在网上找到的创建主题的代码:
@BeforeClass
public static void createTopic() {
try (final AdminClient adminClient = AdminClient.create(configure())) {
try {
// Define topic
NewTopic newTopic = new NewTopic("test-orders", 1, (short)1);
// Create topic, which is async call.
final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
// Since the call is Async, Lets wait for it to complete.
createTopicsResult.values().get(ordersTopic).get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
但是当我运行它时它会抛出异常。
java.lang.NoSuchMethodError: …Run Code Online (Sandbox Code Playgroud)