小编Nic*_*yan的帖子

KTable和本地商店之间的区别

这些实体之间有什么区别?

我认为,KTable-具有compaction删除策略的简单kafka主题。另外,如果为KTable启用了日志记录,那么还将有changelog,然后删除策略为compaction,delete

本地存储-基于RockDB的内存中键值缓存。但是本地商店也有一个变更日志。

在这两种情况下,我们都将在特定时间段(?)中获得密钥的最后一个值。本地存储用于聚合步骤,联接等。但是,紧随其后的还有创建具有压缩策略的新主题。

例如:

KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?

// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

10
推荐指数
1
解决办法
3372
查看次数

如果选项为无使用猫IO,如何在for-comprehension中停止执行?

如果我只是使用选项进行理解,一切都按预期进行:

val a = Some(1)
val b = None
val c = Some(3)

val r = for {
  aa <- a
  bb <- b
  cc <- c
} yield aa + bb + cc

println(r) // None, because b is None
Run Code Online (Sandbox Code Playgroud)

但如何使用猫IO实现相同的行为?

import cats.effect.IO
// in reality this will be a methods with side effect
val a = Some(1)
val b = None
val c = Some(3)

val r = for {
  _ <- IO{println("a"); a}
  _ <- IO{println("b"); b} …
Run Code Online (Sandbox Code Playgroud)

monads functional-programming scala scala-cats

7
推荐指数
1
解决办法
322
查看次数

@Autowired lateinit property''尚未初始化

我正在尝试使用Spring-boot + Vaadin创建一个Web项目,我想使用spring-data-jpa和hibernate来从PostgreSQL数据库中获取数据.

在我的Vaadin的视图中,我尝试自动装配我的服务类,但我总是得到null和错误堆栈跟踪不告诉我为什么.

kotlin.UninitializedPropertyAccessException: lateinit property clientService has not been initialized at com.apache.vaadin.view.Index.getClientService(Index.kt:24) ~[classes/:na] at com.apache.vaadin.view.Index$readButton$1.buttonClick(Index.kt:40) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131] at com.vaadin.event.ListenerMethod.receiveEvent(ListenerMethod.java:510) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.event.EventRouter.fireEvent(EventRouter.java:211) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.event.EventRouter.fireEvent(EventRouter.java:174) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.AbstractClientConnector.fireEvent(AbstractClientConnector.java:1029) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.ui.Button.fireClick(Button.java:370) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.ui.Button$1.click(Button.java:57) ~[vaadin-server-8.0.6.jar:8.0.6] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131] at com.vaadin.server.ServerRpcManager.applyInvocation(ServerRpcManager.java:155) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.ServerRpcManager.applyInvocation(ServerRpcManager.java:116) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.ServerRpcHandler.handleInvocation(ServerRpcHandler.java:445) [vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.ServerRpcHandler.handleInvocations(ServerRpcHandler.java:410) [vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.ServerRpcHandler.handleRpc(ServerRpcHandler.java:274) [vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.UidlRequestHandler.synchronizedHandleRequest(UidlRequestHandler.java:90) [vaadin-server-8.0.6.jar:8.0.6] …

hibernate vaadin spring-data-jpa kotlin spring-boot

5
推荐指数
1
解决办法
5428
查看次数

sqlalchemy中的连接池是线程安全的吗?

文档说连接池也不是为多线程设计的:

至关重要的是,在使用连接池时以及在通过create_engine()创建的Engine使用扩展池时,池化的连接不得共享给分叉的进程。TCP连接表示为文件描述符,它们通常跨进程边界工作,这意味着它将代表两个或多个完全独立的Python解释器状态并发访问文件描述符。

据我了解,如果我创建连接池:

self.engine = create_engine('postgresql://{user}:{password}@{host}:{port}/{db}'.format(
    user=Configuration().get(section='repository', option='user'),
    password=Configuration().get(section='repository', option='password'),
    host=Configuration().get(section='repository', option='host'),
    port=Configuration().get(section='repository', option='port'),
    db=Configuration().get(section='repository', option='database')
), echo=False, pool_size=3)

self.session = sessionmaker(self.engine, expire_on_commit=False)
Run Code Online (Sandbox Code Playgroud)

然后self.session()在不同的线程中调用,我将拥有3个不同的连接,分别在N个不同的线程中使用。这是否意味着只有3个并发线程会执行某些工作,而其他线程会等到一个或多个线程将被调用session.close()?还是有可能> 2个线程同时使用同一连接?

NullPool是否更安全(因为每个新会话都是一个新连接)还是没有?

self.engine = create_engine('postgresql://{user}:{password}@{host}:{port}/{db}'.format(
            user=Configuration().get(section='repository', option='user'),
            password=Configuration().get(section='repository', option='password'),
            host=Configuration().get(section='repository', option='host'),
            port=Configuration().get(section='repository', option='port'),
            db=Configuration().get(section='repository', option='database')
        ), echo=False, poolclass=NullPool)
Run Code Online (Sandbox Code Playgroud)

一般问题:在这种情况下可以使用相同的连接池:

engine = create_engine('connection_string', echo=False, pool_size=3)
Session = sessionmaker(engine)

def some_function():
    session = Session()
    ...

pool = Pool(processes=10)
pool.map(some_function)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

python multithreading sqlalchemy

4
推荐指数
1
解决办法
2249
查看次数

SQL `NULL` 在第 1 列读取(JDBC 类型 null),但映射到非选项类型

我想使用此查询选择最大值(表中的所有字段都不为空):

dc.run(quote {
    query[SchemaInfo]
      .filter(_.subjectName == lift(subject))
      .map(_.version)
      .max
  }).map(_.map(_ + 1).getOrElse(1))
Run Code Online (Sandbox Code Playgroud)

我知道,该表可能是空的,所以我使用这个:map(_.map(_ + 1).getOrElse(1))

问题是此查询会产生以下错误:

SQLNULL在第 1 列读取(JDBC 类型为 null),但映射到非选项类型;在这里使用选项。请注意,JDBC 列索引是从 1 开始的。doobie.util.invariant$NonNullableColumnRead:在第 1 列读取 SQL NULL(JDBC 类型为 null),但映射到非选项类型;在这里使用选项。请注意,JDBC 列索引是从 1 开始的。

如何修复它?没有鹅毛笔(使用纯 doobie)相同的查询可以正常工作

scala quill quill.io doobie

4
推荐指数
1
解决办法
1492
查看次数

无法从密钥环文件 secring.gpg 中检索密钥,因为它不存在

我想使用maven-publishand将我的项目发布到 maven signing。问题是当我尝试发布时:gradle publish我收到此错误:

 * What went wrong: Execution failed for task ':project:signMavenJavaPublication'.
    > Unable to retrieve secret key from key ring file '/Users/nick/.gnupg/secring.gpg ' as it does not exist
Run Code Online (Sandbox Code Playgroud)

在发布之前,我生成如下密钥:

  1. gpg --gen-key
  2. gpg --export-secret-keys > ~/.gnupg/secring.gpg

然后我改变我的~/.gradle.gradle.properties

signing.keyId=ID
signing.password=PASS
signing.secretKeyRingFile=/Users/nick/.gnupg/secring.gpg 
Run Code Online (Sandbox Code Playgroud)

我的 gradle.build 如下所示:

subprojects {
    if (it.name != 'exclusion') {
        apply plugin: 'java-library'
        apply plugin: 'maven-publish'
        apply plugin: 'signing'
    }

    dependencies {
...
    }

    test {
        useJUnitPlatform()

        testLogging {
            events "passed", "skipped", …
Run Code Online (Sandbox Code Playgroud)

publish gnupg gradle maven

3
推荐指数
1
解决办法
947
查看次数

如何将Spring资源注入Apache Ignite类?

我有注射弹簧豆进入一些点燃类的问题.我正在尝试创建这个:客户端 - > Apache Ignite - > Spring-Data - > DataBase也许这是错误的,我不确定.

所以,此时我的课程看起来像:AppConfiguration

@Configuration
@ComponentScan(basePackages = arrayOf("com.ignite.cache"))
open class AppConfiguration : Serializable {
    private val logger: Logger = Logger.getLogger(AppConfiguration::class.java)

    @Bean
    open fun igniteInstance(): Ignite {
        val cfg = IgniteConfiguration()

        cfg.igniteInstanceName = "springDataNode"

        cfg.isPeerClassLoadingEnabled = true


        var clientCache: CacheConfiguration<Long, Client> = CacheConfiguration("ClientCache")
        clientCache.apply {
            setIndexedTypes(Long::class.java, Client::class.java)
            setCacheStoreFactory(FactoryBuilder.factoryOf(ClientStore::class.java))
            isReadThrough = true
            isWriteThrough = true
        }

        cfg.setCacheConfiguration(clientCache)

        return Ignition.start(cfg)
    }
}
Run Code Online (Sandbox Code Playgroud)

DataSourceConfiguration:

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = arrayOf("com.ignite.cache.model.repositories.springdatarepository"))
@EnableIgniteRepositories(basePackages = arrayOf("com.ignite.cache.model.repositories.igniterepository"))
@ComponentScan(basePackages = arrayOf("com.ignite.cache.model"))
open …
Run Code Online (Sandbox Code Playgroud)

spring spring-data spring-data-jpa kotlin ignite

2
推荐指数
1
解决办法
1366
查看次数

java.lang.NoClassDefFoundError:com / twitter / jsr166e / LongAdder

我有一个包含4个节点的Spark集群,并且尝试启动应用程序: spark-submit --deploy-mode cluster --class "Init" --master spark://host /home/cassandra/spark2Cassandra-assembly-0.1.jar-即使添加此功能,这也无法正常工作:--jars /home/cassandra/jsr166e-1.1.0.jar

此外,我尝试将所有依赖项都组装到我的应用程序中 com/twitter/jsr166e/LongAdder

libraryDependencies += "com.twitter" % "jsr166e" % "1.1.0"
Run Code Online (Sandbox Code Playgroud)

另外,我试图将此jar添加到所有节点的spark \ jars目录中-这对我不起作用

如何将这个jar分发到群集中?

完全错误(多次出现同一条消息):

java.lang.NoClassDefFoundError:org.apache.spark.metrics.OutputMetricsUpdater $ TaskMetricsSupport $ class。$ init $(OutputMetricsUpdater.scala:107)上的com / twitter / jsr166e / LongAdder在org.apache.spark.metrics.OutputMetricsUpdater $ TaskMetricsUpdater 。(OutputMetricsUpdater.scala:153)位于org.apache.spark.metrics.OutputMetricsUpdater $ .apply(OutputMetricsUpdater.scala:75)位于com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:174) com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)处的com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162) RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)位于com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)位于org.apache.spark.executor.Executor的org.apache.spark.scheduler.Task.run(Task.scala:108)的org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) Java上的java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)的$ TaskRunner.run(Executor.scala:335)在Java的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) lang.Thread.run(Thread.java:748)java:748)java:748)

更新

如果我使用本地模式并通过以下方式启动它:

spark-shell --jars /data/tmp/spark-cassandra-connector-2.0.5-s_2.11.jar,/data/tmp/jsr166e-1.1.0.jar 
Run Code Online (Sandbox Code Playgroud)

它可以工作,但是对于集群模式却不起作用

scala jar apache-spark

2
推荐指数
1
解决办法
1837
查看次数

如何从 apache nifi 在 kafka 主题中生成 Avro 消息,然后使用 kafka 流读取它?

我想使用 apache nifi 将一些通用数据生成到 kafka 主题中,并且我希望这些数据采用 avro 格式。我为它做了什么:

  1. 在架构注册表中创建新架构:

{ "type": "record", "name": "my_schema", "namespace": "my_namespace", "doc": "", "fields": [ { "name": "key", "type": "int" }, { "name": "value", "type": [ "null", "int" ] }, { "name": "event_time", "type": "long" } ] }

  1. 创建简单的 nifi 管道: 在此处输入图片说明 ConvertAvroSchema 设置: 在此处输入图片说明 PublishKafkaRecord 设置: 在此处输入图片说明 AvroReader 设置: 在此处输入图片说明 AvroRecordSetWriter 设置: 在此处输入图片说明
  2. 然后我尝试使用 kafka 流读取它:

    public class Test { private final static Logger logger = Logger.getLogger(KafkaFilterUsingCacheAvro.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
    
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, …
    Run Code Online (Sandbox Code Playgroud)

avro apache-kafka apache-nifi

1
推荐指数
1
解决办法
2209
查看次数