这些实体之间有什么区别?
我认为,KTable-具有compaction删除策略的简单kafka主题。另外,如果为KTable启用了日志记录,那么还将有changelog,然后删除策略为compaction,delete。
本地存储-基于RockDB的内存中键值缓存。但是本地商店也有一个变更日志。
在这两种情况下,我们都将在特定时间段(?)中获得密钥的最后一个值。本地存储用于聚合步骤,联接等。但是,紧随其后的还有创建具有压缩策略的新主题。
例如:
KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?
// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and …Run Code Online (Sandbox Code Playgroud) 如果我只是使用选项进行理解,一切都按预期进行:
val a = Some(1)
val b = None
val c = Some(3)
val r = for {
aa <- a
bb <- b
cc <- c
} yield aa + bb + cc
println(r) // None, because b is None
Run Code Online (Sandbox Code Playgroud)
但如何使用猫IO实现相同的行为?
import cats.effect.IO
// in reality this will be a methods with side effect
val a = Some(1)
val b = None
val c = Some(3)
val r = for {
_ <- IO{println("a"); a}
_ <- IO{println("b"); b} …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Spring-boot + Vaadin创建一个Web项目,我想使用spring-data-jpa和hibernate来从PostgreSQL数据库中获取数据.
在我的Vaadin的视图中,我尝试自动装配我的服务类,但我总是得到null和错误堆栈跟踪不告诉我为什么.
kotlin.UninitializedPropertyAccessException: lateinit property clientService has not been initialized at com.apache.vaadin.view.Index.getClientService(Index.kt:24) ~[classes/:na] at com.apache.vaadin.view.Index$readButton$1.buttonClick(Index.kt:40) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131] at com.vaadin.event.ListenerMethod.receiveEvent(ListenerMethod.java:510) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.event.EventRouter.fireEvent(EventRouter.java:211) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.event.EventRouter.fireEvent(EventRouter.java:174) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.AbstractClientConnector.fireEvent(AbstractClientConnector.java:1029) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.ui.Button.fireClick(Button.java:370) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.ui.Button$1.click(Button.java:57) ~[vaadin-server-8.0.6.jar:8.0.6] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131] at com.vaadin.server.ServerRpcManager.applyInvocation(ServerRpcManager.java:155) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.ServerRpcManager.applyInvocation(ServerRpcManager.java:116) ~[vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.ServerRpcHandler.handleInvocation(ServerRpcHandler.java:445) [vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.ServerRpcHandler.handleInvocations(ServerRpcHandler.java:410) [vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.ServerRpcHandler.handleRpc(ServerRpcHandler.java:274) [vaadin-server-8.0.6.jar:8.0.6] at com.vaadin.server.communication.UidlRequestHandler.synchronizedHandleRequest(UidlRequestHandler.java:90) [vaadin-server-8.0.6.jar:8.0.6] …
文档说连接池也不是为多线程设计的:
至关重要的是,在使用连接池时以及在通过create_engine()创建的Engine使用扩展池时,池化的连接不得共享给分叉的进程。TCP连接表示为文件描述符,它们通常跨进程边界工作,这意味着它将代表两个或多个完全独立的Python解释器状态并发访问文件描述符。
据我了解,如果我创建连接池:
self.engine = create_engine('postgresql://{user}:{password}@{host}:{port}/{db}'.format(
user=Configuration().get(section='repository', option='user'),
password=Configuration().get(section='repository', option='password'),
host=Configuration().get(section='repository', option='host'),
port=Configuration().get(section='repository', option='port'),
db=Configuration().get(section='repository', option='database')
), echo=False, pool_size=3)
self.session = sessionmaker(self.engine, expire_on_commit=False)
Run Code Online (Sandbox Code Playgroud)
然后self.session()在不同的线程中调用,我将拥有3个不同的连接,分别在N个不同的线程中使用。这是否意味着只有3个并发线程会执行某些工作,而其他线程会等到一个或多个线程将被调用session.close()?还是有可能> 2个线程同时使用同一连接?
NullPool是否更安全(因为每个新会话都是一个新连接)还是没有?
self.engine = create_engine('postgresql://{user}:{password}@{host}:{port}/{db}'.format(
user=Configuration().get(section='repository', option='user'),
password=Configuration().get(section='repository', option='password'),
host=Configuration().get(section='repository', option='host'),
port=Configuration().get(section='repository', option='port'),
db=Configuration().get(section='repository', option='database')
), echo=False, poolclass=NullPool)
Run Code Online (Sandbox Code Playgroud)
一般问题:在这种情况下可以使用相同的连接池:
engine = create_engine('connection_string', echo=False, pool_size=3)
Session = sessionmaker(engine)
def some_function():
session = Session()
...
pool = Pool(processes=10)
pool.map(some_function)
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud) 我想使用此查询选择最大值(表中的所有字段都不为空):
dc.run(quote {
query[SchemaInfo]
.filter(_.subjectName == lift(subject))
.map(_.version)
.max
}).map(_.map(_ + 1).getOrElse(1))
Run Code Online (Sandbox Code Playgroud)
我知道,该表可能是空的,所以我使用这个:map(_.map(_ + 1).getOrElse(1))。
问题是此查询会产生以下错误:
SQL
NULL在第 1 列读取(JDBC 类型为 null),但映射到非选项类型;在这里使用选项。请注意,JDBC 列索引是从 1 开始的。doobie.util.invariant$NonNullableColumnRead:在第 1 列读取 SQLNULL(JDBC 类型为 null),但映射到非选项类型;在这里使用选项。请注意,JDBC 列索引是从 1 开始的。
如何修复它?没有鹅毛笔(使用纯 doobie)相同的查询可以正常工作
我想使用maven-publishand将我的项目发布到 maven signing。问题是当我尝试发布时:gradle publish我收到此错误:
* What went wrong: Execution failed for task ':project:signMavenJavaPublication'.
> Unable to retrieve secret key from key ring file '/Users/nick/.gnupg/secring.gpg ' as it does not exist
Run Code Online (Sandbox Code Playgroud)
在发布之前,我生成如下密钥:
然后我改变我的~/.gradle.gradle.properties:
signing.keyId=ID
signing.password=PASS
signing.secretKeyRingFile=/Users/nick/.gnupg/secring.gpg
Run Code Online (Sandbox Code Playgroud)
我的 gradle.build 如下所示:
subprojects {
if (it.name != 'exclusion') {
apply plugin: 'java-library'
apply plugin: 'maven-publish'
apply plugin: 'signing'
}
dependencies {
...
}
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", …Run Code Online (Sandbox Code Playgroud) 我有注射弹簧豆进入一些点燃类的问题.我正在尝试创建这个:客户端 - > Apache Ignite - > Spring-Data - > DataBase也许这是错误的,我不确定.
所以,此时我的课程看起来像:AppConfiguration
@Configuration
@ComponentScan(basePackages = arrayOf("com.ignite.cache"))
open class AppConfiguration : Serializable {
private val logger: Logger = Logger.getLogger(AppConfiguration::class.java)
@Bean
open fun igniteInstance(): Ignite {
val cfg = IgniteConfiguration()
cfg.igniteInstanceName = "springDataNode"
cfg.isPeerClassLoadingEnabled = true
var clientCache: CacheConfiguration<Long, Client> = CacheConfiguration("ClientCache")
clientCache.apply {
setIndexedTypes(Long::class.java, Client::class.java)
setCacheStoreFactory(FactoryBuilder.factoryOf(ClientStore::class.java))
isReadThrough = true
isWriteThrough = true
}
cfg.setCacheConfiguration(clientCache)
return Ignition.start(cfg)
}
}
Run Code Online (Sandbox Code Playgroud)
DataSourceConfiguration:
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = arrayOf("com.ignite.cache.model.repositories.springdatarepository"))
@EnableIgniteRepositories(basePackages = arrayOf("com.ignite.cache.model.repositories.igniterepository"))
@ComponentScan(basePackages = arrayOf("com.ignite.cache.model"))
open …Run Code Online (Sandbox Code Playgroud) 我有一个包含4个节点的Spark集群,并且尝试启动应用程序:
spark-submit --deploy-mode cluster --class "Init" --master spark://host /home/cassandra/spark2Cassandra-assembly-0.1.jar-即使添加此功能,这也无法正常工作:--jars /home/cassandra/jsr166e-1.1.0.jar
此外,我尝试将所有依赖项都组装到我的应用程序中 com/twitter/jsr166e/LongAdder
libraryDependencies += "com.twitter" % "jsr166e" % "1.1.0"
Run Code Online (Sandbox Code Playgroud)
另外,我试图将此jar添加到所有节点的spark \ jars目录中-这对我不起作用
如何将这个jar分发到群集中?
完全错误(多次出现同一条消息):
java.lang.NoClassDefFoundError:org.apache.spark.metrics.OutputMetricsUpdater $ TaskMetricsSupport $ class。$ init $(OutputMetricsUpdater.scala:107)上的com / twitter / jsr166e / LongAdder在org.apache.spark.metrics.OutputMetricsUpdater $ TaskMetricsUpdater 。(OutputMetricsUpdater.scala:153)位于org.apache.spark.metrics.OutputMetricsUpdater $ .apply(OutputMetricsUpdater.scala:75)位于com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:174) com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)处的com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162) RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)位于com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)位于org.apache.spark.executor.Executor的org.apache.spark.scheduler.Task.run(Task.scala:108)的org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) Java上的java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)的$ TaskRunner.run(Executor.scala:335)在Java的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) lang.Thread.run(Thread.java:748)java:748)java:748)
更新
如果我使用本地模式并通过以下方式启动它:
spark-shell --jars /data/tmp/spark-cassandra-connector-2.0.5-s_2.11.jar,/data/tmp/jsr166e-1.1.0.jar
Run Code Online (Sandbox Code Playgroud)
它可以工作,但是对于集群模式却不起作用
我想使用 apache nifi 将一些通用数据生成到 kafka 主题中,并且我希望这些数据采用 avro 格式。我为它做了什么:
{ "type": "record", "name": "my_schema", "namespace": "my_namespace", "doc": "", "fields": [ { "name": "key", "type": "int" }, { "name": "value", "type": [ "null", "int" ] }, { "name": "event_time", "type": "long" } ] }
ConvertAvroSchema 设置:
PublishKafkaRecord 设置:
AvroReader 设置:
AvroRecordSetWriter 设置:

然后我尝试使用 kafka 流读取它:
public class Test { private final static Logger logger = Logger.getLogger(KafkaFilterUsingCacheAvro.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, …Run Code Online (Sandbox Code Playgroud)scala ×3
apache-kafka ×2
kotlin ×2
apache-nifi ×1
apache-spark ×1
avro ×1
doobie ×1
gnupg ×1
gradle ×1
hibernate ×1
ignite ×1
jar ×1
maven ×1
monads ×1
publish ×1
python ×1
quill ×1
quill.io ×1
scala-cats ×1
spring ×1
spring-boot ×1
spring-data ×1
sqlalchemy ×1
vaadin ×1