我有一个主题,一个分区和两个构成一个消费者组的消费者进程.
这样,消息始终传递给单个消费者.StickyAssignor用于优先选择已经分配给重新平衡分区的消费者.
我一直在玩这个设置,并发现在某些情况下,消息被传递给两个消费者,这打破了消费者群体的目的.
方案如下:
使用RangeAssignor/RoundRobinAssignor时,不会发生这种情况.
我错过了什么,或者这是卡夫卡的一个错误?
这是我的消费者代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", consumerId);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", StickyAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Run Code Online (Sandbox Code Playgroud) 我已经在线程T1中启动了一个Pulsar Java消费者,我正在将消息传递给线程T2进行处理.
确认来自T2的消息是否安全?具体来说,是调用consumer.acknowledge(messageId)
两个线程之间共享的实例吗?
我有一个肥胖的罐子,我正试图得到Kotlin的实例ScriptEngine
.
出于调试目的,我正在迭代可用的脚本引擎工厂并获取引擎.
val scriptEngineManager = ScriptEngineManager()
for (factory in scriptEngineManager.engineFactories) {
val scriptEngine = factory.scriptEngine
}
Run Code Online (Sandbox Code Playgroud)
当它击中Kotlin的引擎时,它失败并出现以下异常:
Exception in thread "main" java.io.FileNotFoundException: Cannot find kotlin compiler jar, set kotlin.compiler.jar property to proper location
at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt$kotlinCompilerJar$2.invoke(KotlinJsr223ScriptEngineFactoryExamples.kt:100)
at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt$kotlinCompilerJar$2.invoke(KotlinJsr223ScriptEngineFactoryExamples.kt)
at kotlin.SynchronizedLazyImpl.getValue(Lazy.kt:130)
at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt.getKotlinCompilerJar(KotlinJsr223ScriptEngineFactoryExamples.kt)
at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt.access$getKotlinCompilerJar$p(KotlinJsr223ScriptEngineFactoryExamples.kt:1)
at org.jetbrains.kotlin.script.jsr223.KotlinJsr223JvmDaemonLocalEvalScriptEngineFactory.getScriptEngine(KotlinJsr223ScriptEngineFactoryExamples.kt:56)
at davidsiro.invoices.InvoiceGeneratorKt.generateInvoice(invoiceGenerator.kt:16)
at davidsiro.invoices.MainKt.main(main.kt:11)
Run Code Online (Sandbox Code Playgroud)
我的胖jar包含所有依赖项(虽然解压缩),包括Kotlin Compiler.我正在使用Maven Assembly Plugin来构建它,它配置如下:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>${main.class}</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
Run Code Online (Sandbox Code Playgroud)
有任何想法吗? …
我正在尝试使用 Slick 3.0 进行数据访问。在咨询了各种 github 示例后,我得出了以下设计。
注入 DataSource 和 Driver 实例的单例 Slick 对象
class Slick(dataSource: DataSource, val driver: JdbcDriver) {
val db = driver.api.Database.forDataSource(dataSource)
}
Run Code Online (Sandbox Code Playgroud)
每个数据库表的特征,其中定义了映射
特征混合在构建查询的上层。
trait RecipeTable {
protected val slick: Slick
// the ugly import that have to be added when Slick API is used
import slick.driver.api._
type RecipeRow = (Option[Long], String)
class RecipeTable(tag: Tag) extends Table[RecipeRow](tag, "recipe") {
def id = column[Option[Long]]("id", O.PrimaryKey, O.AutoInc)
def name = column[String]("name")
def * = (id, name)
}
protected val …
Run Code Online (Sandbox Code Playgroud) 我正在努力实现非常简单的事情。
说,我有一个REST API。当我打电话
/api/recipe/1
Run Code Online (Sandbox Code Playgroud)
我想将一种资源作为json返回。
当我打
/api/recipe/2
Run Code Online (Sandbox Code Playgroud)
404 Not Found HTTP响应应返回。就那么简单。
显然,我缺少关于路由指令如何工作的信息,因为我无法将它们组成以遵守上述逻辑。
不幸的是,我找不到任何具体示例,官方文档也没有特别帮助。
我正在尝试类似的操作,但是代码给出了编译错误:
class RecipeResource(recipeService: RecipeService)(implicit executionContext: ExecutionContext) extends DefaultJsonProtocol {
implicit val recipeFormat = jsonFormat1(Recipe.apply)
val routes = pathPrefix("recipe") {
(get & path(LongNumber)) { id =>
complete {
recipeService.getRecipeById(id).map {
case Some(recipe) => ToResponseMarshallable(recipe)
// type mismatch here, akka.http.scaladsl.marshalling.ToResponseMarshallable
// is required
case None => HttpResponse(StatusCodes.NotFound)
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
以下是recipeService
更清晰的代码:
class RecipeService(implicit executionContext: ExecutionContext) {
def getRecipeById(id: Long): Future[Option[Recipe]] = {
id match …
Run Code Online (Sandbox Code Playgroud)