按照春季5:
WebClient是表示执行Web请求的主要入口点的接口.
它已作为Spring Web Reactive模块的一部分创建,并将在这些场景中替换经典的RestTemplate.新客户端是一种通过HTTP/1.1协议工作的反应式非阻塞解决方案
这是否意味着,如果我们想要升级到Spring 5,我们需要使用RestTemplate重新编码旧应用程序?
或者在Spring 5中使用RestTemplate有一些解决方法?
根据文件:
Flux是一个可以发出0..N元素的流:
Flux<String> fl = Flux.just("a", "b", "c");
Run Code Online (Sandbox Code Playgroud)
Mono是一个0..1元素的流:
Mono<String> mn = Mono.just("hello");
Run Code Online (Sandbox Code Playgroud)
因为两者都是反应流中Publisher接口的实现.
我们不能在大多数情况下只使用Flux,因为它也可以发出0..1,从而满足Mono的条件?
或者只有一些特定的条件,只需要使用Mono并且Flux无法处理操作?请建议.
我是Kafka的新手,正在尝试在本地计算机上创建一个新主题。
我正在跟踪此链接。
这是我遵循的步骤:
启动动物园管理员
bin/zookeeper-server-start.sh config/zookeeper.properties
Run Code Online (Sandbox Code Playgroud)启动kafka服务器
bin/kafka-server-start.sh config/server.properties
Run Code Online (Sandbox Code Playgroud)创建一个话题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)但是在创建主题时,出现以下错误:
Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:358)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)
创建主题是否需要其他配置?我在做什么错
我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)
虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)
是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?
publisher publish-subscribe reactive-programming project-reactor reactive-streams
我正在学习 Scala 并且知道我们可以使用两个扩展名保存 Scala 文件,即 my.sc 和 my.scala。
这是我创建的示例文件:
我的.scala
object My {
/** Our main function where the action happens */
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "my")
val lines = sc.textFile("readme.tx")
val results = lines.countByValue()
}
}
Run Code Online (Sandbox Code Playgroud)
我的.sc
object My {
val hello: String = "Hello World!"
println(hello)
}
Run Code Online (Sandbox Code Playgroud)
两者有什么区别?
何时使用 sc 文件扩展名,何时使用 scala 文件扩展名?
我正在使用 aerospike 并使用 vagrant virtual box 安装它。
安装后,当我尝试启动虚拟机时,出现以下错误:
. 执行时出错
VBoxManage,这是 Vagrant 用于控制 VirtualBox 的 CLI。命令和标准错误如下所示。命令:["startvm", "dff6693e-52c8-4c9e-922a-243d18c7f666", "--type", "headless"]
Stderr:VBoxManage:错误:VM 会话在任何尝试打开 VBoxManage 之前已关闭:错误:详细信息:代码 NS_ERROR_FAILURE (0x80004005),组件 SessionMachine,接口 ISession
我正在使用 mac 机器进行此设置。
有什么建议吗?
根据一些条件,我从MongoDB读取数据并创建一个List<Document>带结果集.
List<Document> documentList = new ArrayList<Document>();
Run Code Online (Sandbox Code Playgroud)
样本记录看起来像:
documentList: [
Document{
{ _id=5975ff00a213745b5e1a8ed9,
u_id=,
visblty = 1,
c_id=5975ff00a213745b5e1a8ed8,
batchid=null,
pdate=Tue Jul 11 17:52:25 IST 2017,
locale=en_US,
subject = "Document2"
} },
Document{
{ _id=597608aba213742554f537a6,
u_id=,
visblty = 1,
c_id=597608aba213742554f537a3,
batchid=null,
pdate=Fri Jul 28 01:26:22 IST 2017,
locale=en_US,
subject = "Document2"
} }
]
Run Code Online (Sandbox Code Playgroud)
使用此documentList,我再次使用某些条件进行过滤,然后我需要根据某些条件(我将在请求中获取)对过滤器记录进行排序.
List<Document> outList = documentList.stream()
.filter(d -> d.getInteger("visblty") == 1
&& (!StringUtils.isEmpty(req.pdate())? (d.getDate(CommonConstants.PDATE).after(afterDate)): true)
&& (!StringUtils.isEmpty(req.pdate())? (d.getDate(CommonConstants.PDATE).before(beforeDate)): true)
.sorted().skip(4).limit()
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
不确定如何排序(动态需要根据输入更改排序顺序,它看起来像" pdate by DESC"或" subject …
我正在使用Apache Camel SQL批量插入过程.
我的应用程序是从Active MQ读取票据,其中包含大约2000张票据.
我已将批次更新为100.
我要解雇的查询如下:
sql.subs.insertCdr=
insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp)
values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)
SQL批处理路由定义如下:
<pipeline>
<log message="Going to insert in database"></log>
<transform>
<method ref="insertionBean" method="subsBatchInsertion"></method>
</transform>
<choice>
<when>
<simple>${in.header.subsCount} == ${properties:batch.size}</simple>
<to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to>
<log message="Inserted rows ${body}"></log>
</when>
</choice>
</pipeline>
Run Code Online (Sandbox Code Playgroud)下面是我的java代码:
public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) {
if (subsBatchCounter > batchSize) {
subsPayLoad.clear();
subsBatchCounter = 1;
}
subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class)));
exchange.getIn().setHeader("subsCount", subsBatchCounter);
subsBatchCounter++;
return subsPayLoad;
}
public Map<String, Object> generateInsert(Cdr cdr) {
Map<String, Object> insert = new HashMap<String, Object>();
try {
insert …Run Code Online (Sandbox Code Playgroud)Camel中的wireTap和多播的默认行为似乎相似.这两者都有助于以不同的方式处理相同的消息.那么wireTap和组播之间的主要区别是什么?
我正在学习使用java流过滤.但过滤后的流不会打印任何内容.我认为过滤方法没有被执行.我的过滤代码如下:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
s.startsWith("b");
System.out.println("filter: " + s);
return true;
});
Run Code Online (Sandbox Code Playgroud)
没有编译错误,也没有例外.有什么建议吗?
java ×4
apache-camel ×2
java-8 ×2
java-stream ×2
aerospike ×1
apache-kafka ×1
apache-spark ×1
caching ×1
camel-jdbc ×1
camel-sql ×1
filter ×1
jms ×1
kafka-topic ×1
macos ×1
multicast ×1
publisher ×1
resttemplate ×1
scala ×1
scala-ide ×1
sorting ×1
spring ×1
stream ×1
vagrant ×1
web-client ×1