小编Dav*_*iro的帖子

Kafka StickyAssignor打破了集团中单一消费者的交付

我有一个主题,一个分区和两个构成一个消费者组的消费者进程.

这样,消息始终传递给单个消费者.StickyAssignor用于优先选择已经分配给重新平衡分区的消费者.

我一直在玩这个设置,并发现在某些情况下,消息被传递给两个消费者,这打破了消费者群体的目的.

方案如下:

  1. 启动消费者C1
  2. C1开始接收消息
  3. 启动消费者C2
  4. C2没有收到任何消息 - 感谢StickyAssignor策略更喜欢C1
  5. 冻结C1进程 - (使用Java调试器 - 停止所有线程)
  6. C2接管 - 开始接收消息
  7. 解冻C1流程
  8. 现在C1和C2都接收消息,尽管它们在同一组中

使用RangeAssignor/RoundRobinAssignor时,不会发生这种情况.

我错过了什么,或者这是卡夫卡的一个错误?

这是我的消费者代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("client.id", consumerId);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", StickyAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Collections.singleton("my-events"));



while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

5
推荐指数
0
解决办法
483
查看次数

Apache Pulsar - 从不同的线程承认

我已经在线程T1中启动了一个Pulsar Java消费者,我正在将消息传递给线程T2进行处理.

确认来自T2的消息是否安全?具体来说,是调用consumer.acknowledge(messageId)两个线程之间共享的实例吗?

java multithreading apache-pulsar

5
推荐指数
1
解决办法
163
查看次数

Kotlin JSR-223 ScriptEngineFactory里面的胖罐子 - 找不到kotlin编译jar

我有一个肥胖的罐子,我正试图得到Kotlin的实例ScriptEngine.

出于调试目的,我正在迭代可用的脚本引擎工厂并获取引擎.

val scriptEngineManager = ScriptEngineManager()
for (factory in scriptEngineManager.engineFactories) {
    val scriptEngine = factory.scriptEngine
}
Run Code Online (Sandbox Code Playgroud)

当它击中Kotlin的引擎时,它失败并出现以下异常:

Exception in thread "main" java.io.FileNotFoundException: Cannot find kotlin compiler jar, set kotlin.compiler.jar property to proper location
        at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt$kotlinCompilerJar$2.invoke(KotlinJsr223ScriptEngineFactoryExamples.kt:100)
        at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt$kotlinCompilerJar$2.invoke(KotlinJsr223ScriptEngineFactoryExamples.kt)
        at kotlin.SynchronizedLazyImpl.getValue(Lazy.kt:130)
        at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt.getKotlinCompilerJar(KotlinJsr223ScriptEngineFactoryExamples.kt)
        at org.jetbrains.kotlin.script.jsr223.KotlinJsr223ScriptEngineFactoryExamplesKt.access$getKotlinCompilerJar$p(KotlinJsr223ScriptEngineFactoryExamples.kt:1)
        at org.jetbrains.kotlin.script.jsr223.KotlinJsr223JvmDaemonLocalEvalScriptEngineFactory.getScriptEngine(KotlinJsr223ScriptEngineFactoryExamples.kt:56)
        at davidsiro.invoices.InvoiceGeneratorKt.generateInvoice(invoiceGenerator.kt:16)
        at davidsiro.invoices.MainKt.main(main.kt:11)
Run Code Online (Sandbox Code Playgroud)

我的胖jar包含所有依赖项(虽然解压缩),包括Kotlin Compiler.我正在使用Maven Assembly Plugin来构建它,它配置如下:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <version>2.6</version>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>${main.class}</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </execution>
    </executions>
</plugin>
Run Code Online (Sandbox Code Playgroud)

有任何想法吗? …

executable-jar maven jsr223 kotlin

3
推荐指数
1
解决办法
2361
查看次数

在数据访问层复用Slick的DB驱动代码

我正在尝试使用 Slick 3.0 进行数据访问。在咨询了各种 github 示例后,我得出了以下设计。

注入 DataSource 和 Driver 实例的单例 Slick 对象

class Slick(dataSource: DataSource, val driver: JdbcDriver)  {

  val db = driver.api.Database.forDataSource(dataSource)     

}
Run Code Online (Sandbox Code Playgroud)

每个数据库表的特征,其中定义了映射

特征混合在构建查询的上层。

trait RecipeTable {

  protected val slick: Slick

  // the ugly import that have to be added when Slick API is used
  import slick.driver.api._

  type RecipeRow = (Option[Long], String)

  class RecipeTable(tag: Tag) extends Table[RecipeRow](tag, "recipe") {

    def id = column[Option[Long]]("id", O.PrimaryKey, O.AutoInc)
    def name = column[String]("name")

    def * = (id, name)    
  }

  protected val …
Run Code Online (Sandbox Code Playgroud)

data-access scala slick

2
推荐指数
1
解决办法
346
查看次数

找不到Akka Http返回404

我正在努力实现非常简单的事情。

说,我有一个REST API。当我打电话

/api/recipe/1
Run Code Online (Sandbox Code Playgroud)

我想将一种资源作为json返回。

当我打

/api/recipe/2
Run Code Online (Sandbox Code Playgroud)

404 Not Found HTTP响应应返回。就那么简单。

显然,我缺少关于路由指令如何工作的信息,因为我无法将它们组成以遵守上述逻辑。

不幸的是,我找不到任何具体示例,官方文档也没有特别帮助。

我正在尝试类似的操作,但是代码给出了编译错误:

class RecipeResource(recipeService: RecipeService)(implicit executionContext: ExecutionContext) extends DefaultJsonProtocol {

  implicit val recipeFormat = jsonFormat1(Recipe.apply)

  val routes = pathPrefix("recipe") {
    (get & path(LongNumber)) { id =>
      complete {
        recipeService.getRecipeById(id).map {
          case Some(recipe) => ToResponseMarshallable(recipe)
          // type mismatch here, akka.http.scaladsl.marshalling.ToResponseMarshallable 
          // is required
          case None => HttpResponse(StatusCodes.NotFound)
        }
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

更新资料

以下是recipeService更清晰的代码:

class RecipeService(implicit executionContext: ExecutionContext) {

  def getRecipeById(id: Long): Future[Option[Recipe]] = {
    id match …
Run Code Online (Sandbox Code Playgroud)

scala http akka http-status-code-404 akka-http

1
推荐指数
1
解决办法
3909
查看次数