我使用Java中的Flink编写了一个简单程序,该程序将文件或文本作为输入,然后使用flatMap函数打印所有单词。
这是我的代码:
final ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
// show user defined parameters in the apache flink dashboard
DataStream<String> dataStream;
if(params.has("input"))
{
System.out.println("Executing Words example with file input");
dataStream = env.readTextFile(params.get("input"));
}else if (params.has("host") && params.has("port"))
{
System.out.println("Executing Words example with socket stream");
dataStream = env.socketTextStream(params.get("host"), Integer.parseInt(params.get("port")));
}
else {
System.exit(1);
return;
}
DataStream<String> wordDataStream = dataStream.flatMap(
(String sentence, Collector<String> out) -> {
for(String word: sentence.split(" "))
out.collect(word);
});
wordDataStream.print();
env.execute("Word Split");
Run Code Online (Sandbox Code Playgroud)
但是当我使用以下命令运行它时:
bin/flink run -c …Run Code Online (Sandbox Code Playgroud) 我有一个包含存储过程名称的变量.我想用动态sql执行这个过程,所以我这样做了:
var_procedure是包含我的存储过程名称的变量.firstparameter和secondone是存储过程的varchar参数.
execute immediate var_procedure||'('''||firstparameter||''','''||secondone||''')';
Run Code Online (Sandbox Code Playgroud)
它没有用,我打印出这个基本错误:
ORA-00900: invalid SQL statement
Run Code Online (Sandbox Code Playgroud)
你有解决方案吗 ?
假设我有一个带有两个可选字段的Toto类:
case class Toto(a : Option[Int], b: Option[Int])
Run Code Online (Sandbox Code Playgroud)
还有一个带有一个可选Toto的Titi类:
case class Titi(c : Option[Toto])
Run Code Online (Sandbox Code Playgroud)
我们创建一个Titi类的实例:
val test = Titi(Some(Toto(Some(1),Some(2))))
Run Code Online (Sandbox Code Playgroud)
现在,我想通过假定Titi或b可以等于None来访问Titi变量中Toto的第二个字段,但这条语句是不可能的:
test.c.getOrElse("Something").b.getOrElse(0)
Run Code Online (Sandbox Code Playgroud)
我该怎么做呢?
当我想实例化 KafkaProducer 时,我遇到了一个问题:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)
kafkaconsumer 类型不是通用的,它不能用参数参数化
我不明白这个错误,因为我知道我正在关注官方的 kafka javadoc,其中他们做的事情与我的依赖完全相同:
https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
谁能给我解释一下这个笑话?