我是 ksql 的新手。我只是将 kafka 主题读到流中,效果很好。
此外,尝试从 kafka 主题创建表并失败。意识到我需要在 kafka 主题中设置一个键,该键被视为 ksql 表中的主键。所以我尝试从流创建表,但也失败了。查询/脚本:
CREATE TABLE DETAILS_TABLE AS SELECT SEQ, Server1, ServerId, NumberUri, SERVERID2, SERVER2 FROM details_stream WINDOW TUMBLING (SIZE 1 MINUTES);
Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.
Run Code Online (Sandbox Code Playgroud)
有人可以解释一下是否可能吗?如果是,我哪里出错了?谢谢。
练习一些代码示例,我发现了这个:我已经在类级别声明了最终变量并尝试从方法中分配值,导致编译时错误(代码-2).但是从构造函数中它确实被赋值(代码-1).
代码-1:
class Immutable {
private final int age;
private final String name;
private Immutable(int age, String name) {
this.age = age;
this.name = name;
}
Run Code Online (Sandbox Code Playgroud)
代码-2:
class Immutable {
private final int age;
private final String name;
private void me() {
this.age = 19;
this.name = "name";
}
Run Code Online (Sandbox Code Playgroud)
当然,它们也可以在类级别分配,但不允许再次在构造函数中执行,因为它们只允许声明一次.但是,为什么最终变量在构造函数中分配而不是从方法中分配?
我有一个列表Objects与java.util.date字段为一体.如何Objects根据日期(每小时)分离/过滤.即例如:Objects日期范围从12.00到13:00应分开或形成新列表,依此类推每小时.帮助我实现这一目标.
我正在学习Kafka Streams并尝试实现以下目标:
创建了2个Kafka主题(比如topic1,topic2),其中null为键,而JSONString为值。来自topic1的数据(无重复项)在topic2中具有多个匹配条目。即,topic1具有一些主流数据,当与topic2结合时可以生成新的多个数据流。
例:
topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}
Run Code Online (Sandbox Code Playgroud)
预期产量: {"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}
想要持久保存/保留topic1的数据流以供将来参考,而topic2的数据流仅用于实现上述用例,不需要任何持久性/保留。
我有几个问题:1)应该将topic1数据流保存/存储几天(可能吗?),以便可以合并来自topic2的传入数据流。可能吗?2)我应该使用什么实现KStream或KTable?3)这称为背压机制吗?
Kafka Stream是否支持此用例,还是我应该注意其他事项?请建议。
我用5分钟的窗口用KStream尝试了一段代码,但是看起来我无法在流中保存topic1数据。
请以正确的选择帮助我并加入。我正在使用Confluent的Kafka和Docker实例。
public void run() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
// Hold data from this topic to …Run Code Online (Sandbox Code Playgroud) 嗨,我在流文件中有以下 json:{"destination.port":"0000","network.packets":"1","event.end":"1563361839","source.address":"1.2.2.1","message":"OK","server.address":"ip-1-2-2-3.ec2.internal","event.action":"ACCEPT","event.module":"s3bucket","source.port":"478","network.protocol":"6","cloud.account.id":"123456","event.type":"data","organization.id":"Fusion","destination.address":"1.2.2.2","network.bytes":"60","event.start":"1563361837","event.kind":"2","host.id":"eni-06f72","timestamp":"2019-07-17T11:16:39.792Z"}以 event.end 和 event.start 作为纪元。
我正在使用 nifi-1.9.2。帮助我将纪元转换为 UTC 中的日期并替换相同的日期。我曾尝试将updateAttribute处理器与 一起使用/event.start=${event.start:format("yyyy-MM-dd HH:mm:ss.SSS")},但不正确。我已经为处理器做了一些阅读,但不能。谢谢
我是maven的新手,它是一门有趣的学科。使用以下命令成功创建具有依赖项的jar:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</configuration>
</plugin>
Run Code Online (Sandbox Code Playgroud)
现在,我必须包括一些shell脚本和生成的jar到tar中。为此,我尝试了以下方法:已添加
<descriptors>
<descriptor>hadoop-job.xml</descriptor>
</descriptors>
Run Code Online (Sandbox Code Playgroud)
以上脚本。
在hadoop-job.xml中,我将所需文件包含到tar中。
问题是tar首先生成,并说在目标中找不到* .jar。因为这两个配置都位于程序集插件中,所以是否有办法先安排jar创建,然后安排tar。还是有一个命令先执行并生成jar,然后再生成tar命令?
顺便说一句,我正在执行mvn clean assembly:assembly -Dbinary = true。帮我解决。谢谢。
我正在使用MongoDB来存储我的数据.Mongo以UTC格式存储时间戳作为默认值.我们处理不同时区的数据.我很难将UTC时间戳转换为PDT或IST时间戳.
尝试构建一个方法来传递时区(我的时间戳将转换为其中)和时间戳(UTC).返回指定时区的时间戳的方法.
public Date getDateBasedOnZone(Date date, "America/Los_Angeles") {
return dateInAmerica/Los_Angeles;
}
Run Code Online (Sandbox Code Playgroud) java ×4
apache-kafka ×2
apache-nifi ×1
arraylist ×1
constructor ×1
datetime ×1
final ×1
java-8 ×1
json ×1
ksqldb ×1
maven ×1
methods ×1
mongodb ×1