小编Kay*_*ayV的帖子

WebClient与RestTemplate

按照春季5:

WebClient是表示执行Web请求的主要入口点的接口.

它已作为Spring Web Reactive模块的一部分创建,并将在这些场景中替换经典的RestTemplate.新客户端是一种通过HTTP/1.1协议工作的反应式非阻塞解决方案

这是否意味着,如果我们想要升级到Spring 5,我们需要使用RestTemplate重新编码旧应用程序?

或者在Spring 5中使用RestTemplate有一些解决方法?

spring reactive-programming web-client resttemplate

31
推荐指数
5
解决办法
2万
查看次数

Mono vs Flux in Reactive Stream

根据文件:

Flux是一个可以发出0..N元素的流:

Flux<String> fl = Flux.just("a", "b", "c");
Run Code Online (Sandbox Code Playgroud)

Mono是一个0..1元素的流:

Mono<String> mn = Mono.just("hello");
Run Code Online (Sandbox Code Playgroud)

因为两者都是反应流中Publisher接口的实现.

我们不能在大多数情况下只使用Flux,因为它也可以发出0..1,从而满足Mono的条件?

或者只有一些特定的条件,只需要使用Mono并且Flux无法处理操作?请建议.

reactive-programming project-reactor reactive-streams

30
推荐指数
3
解决办法
2万
查看次数

kafka为什么不创建主题?bootstrap-server不是公认的选项

我是Kafka的新手,正在尝试在本地计算机上创建一个新主题。

我正在跟踪此链接

这是我遵循的步骤:

  1. 启动动物园管理员

    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    Run Code Online (Sandbox Code Playgroud)
  2. 启动kafka服务器

    bin/kafka-server-start.sh config/server.properties
    
    Run Code Online (Sandbox Code Playgroud)
  3. 创建一个话题

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    Run Code Online (Sandbox Code Playgroud)

但是在创建主题时,出现以下错误:

Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
    at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
    at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
    at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
    at joptsimple.OptionParser.parse(OptionParser.java:396)
    at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:358)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
Run Code Online (Sandbox Code Playgroud)

创建主题是否需要其他配置?我在做什么错

apache-kafka apache-zookeeper kafka-topic

16
推荐指数
1
解决办法
5212
查看次数

Project Reactor 3 中的 publishOn 与 subscribeOn

我在相同的通量上使用 publishOn 和 subscribeOn,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");
Run Code Online (Sandbox Code Playgroud)

虽然,当我同时使用两者时,日志中没有打印任何内容。但是当我只使用 publishOn 时,我得到了以下信息日志:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Run Code Online (Sandbox Code Playgroud)

是不是publishOn 比subscribeOn 更值得推荐?或者它比 subscribeOn 有更多的偏好?两者有什么区别以及何时使用哪个?

publisher publish-subscribe reactive-programming project-reactor reactive-streams

13
推荐指数
2
解决办法
1万
查看次数

.sc 和 .scala 文件有什么区别?

我正在学习 Scala 并且知道我们可以使用两个扩展名保存 Scala 文件,即 my.sc 和 my.scala。

这是我创建的示例文件:

我的.scala

object My {

  /** Our main function where the action happens */
  def main(args: Array[String]) {


    Logger.getLogger("org").setLevel(Level.ERROR)
    val sc = new SparkContext("local[*]", "my")
    val lines = sc.textFile("readme.tx")
    val results = lines.countByValue()

   }
}
Run Code Online (Sandbox Code Playgroud)

我的.sc

object My {

  val hello: String = "Hello World!"
  println(hello)
}
Run Code Online (Sandbox Code Playgroud)

两者有什么区别?

何时使用 sc 文件扩展名,何时使用 scala 文件扩展名?

scala intellij-idea scala-ide apache-spark

11
推荐指数
1
解决办法
5484
查看次数

Mac:执行 VBoxManage(Vagrant 使用的 CLI)时出错

我正在使用 aerospike 并使用 vagrant virtual box 安装它。

安装后,当我尝试启动虚拟机时,出现以下错误:

. 执行时出错VBoxManage,这是 Vagrant 用于控制 VirtualBox 的 CLI。命令和标准错误如下所示。

命令:["startvm", "dff6693e-52c8-4c9e-922a-243d18c7f666", "--type", "headless"]

Stderr:VBoxManage:错误:VM 会话在任何尝试打开 VBoxManage 之前已关闭:错误:详细信息:代码 NS_ERROR_FAILURE (0x80004005),组件 SessionMachine,接口 ISession

我正在使用 mac 机器进行此设置。

有什么建议吗?

macos caching virtual-machine vagrant aerospike

11
推荐指数
3
解决办法
1万
查看次数

Java 8使用2个字段排序

根据一些条件,我从MongoDB读取数据并创建一个List<Document>带结果集.

List<Document> documentList = new ArrayList<Document>();
Run Code Online (Sandbox Code Playgroud)

样本记录看起来像:

documentList: [
    Document{
        { _id=5975ff00a213745b5e1a8ed9,
            u_id=,
            visblty = 1,
            c_id=5975ff00a213745b5e1a8ed8,                
            batchid=null,
            pdate=Tue Jul 11 17:52:25 IST 2017, 
            locale=en_US,
            subject = "Document2"
        }     },
    Document{
        { _id=597608aba213742554f537a6,
            u_id=,
            visblty = 1,
            c_id=597608aba213742554f537a3, 
            batchid=null,
            pdate=Fri Jul 28 01:26:22 IST 2017,
            locale=en_US,
            subject = "Document2"
        }    } 
]
Run Code Online (Sandbox Code Playgroud)

使用此documentList,我再次使用某些条件进行过滤,然后我需要根据某些条件(我将在请求中获取)对过滤器记录进行排序.

List<Document> outList = documentList.stream()
                .filter(d -> d.getInteger("visblty") == 1
                && (!StringUtils.isEmpty(req.pdate())? (d.getDate(CommonConstants.PDATE).after(afterDate)): true) 
                && (!StringUtils.isEmpty(req.pdate())? (d.getDate(CommonConstants.PDATE).before(beforeDate)): true)
                .sorted().skip(4).limit()
                .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

不确定如何排序(动态需要根据输入更改排序顺序,它看起来像" pdate by DESC"或" subject …

java sorting java-8 java-stream

10
推荐指数
3
解决办法
1万
查看次数

Apache Camel SQL批量插入需要很长时间

我正在使用Apache Camel SQL批量插入过程.

  1. 我的应用程序是从Active MQ读取票据,其中包含大约2000张票据.

  2. 我已将批次更新为100.

  3. 我要解雇的查询如下:

    sql.subs.insertCdr= insert into subscription_logs(master_id,request_type,req_desc,msisdn,amount,status,resp_code,resp_desc,channel,transaction_id,se_mode,be_mode,sub_type,sub_timeleft,srv_name,srv_id,start_date,end_date,operator,circle,country,time_offset,retry_count,user_status,previous_state,se_reqrecvtime,se_respsenttime,be_reqsenttime,be_resprecvtime,cp_id,cp_name,sub_srvname,sub_srvid,msg_senderid,msg_text,call_back_url,call_back_resp,client_ip,se_sysIp,language,cp_callbackurlhittime,action,alert,notification_url,notification_resp) values(:#masterId, :#requestType,:#reqDesc,:#msisdnCdr,:#price,:#status,:#responseCode,:#reason,:#channel,:#transactionId,:#seMode,:#beMode,:#subType,:#subTimeLeft,:#serviceName,:#serviceId,:#subStartDate,:#cdrEndDate,:#operator,:#circle,:#country,:#timeOffset,:#retryCount,:#userStatus,:#previousState,:#seReqRecvTime,:#seRespSentTime,:#beReqSentTime,:#beRespRecvTime,:#cpId,:#cpName,:#subServiceName,:#subServiceId,:#shortCode,:#message,:#callBackUrl,:#callBackResp,:#clientIp,:#seSysIp,:#language,:#cpCallbackUrlHitTime,:#action,:#alert,:#notificationUrl,:#notificationResponse)

  4. SQL批处理路由定义如下:

    <pipeline>
       <log message="Going to insert in database"></log>
       <transform>
          <method ref="insertionBean" method="subsBatchInsertion"></method>
       </transform>
       <choice>
           <when>
               <simple>${in.header.subsCount} == ${properties:batch.size}</simple>
               <to uri="sql:{{sql.subs.insertCdr}}?batch=true"></to>
               <log message="Inserted rows ${body}"></log>
           </when>
       </choice>
    </pipeline>
    
    Run Code Online (Sandbox Code Playgroud)
  5. 下面是我的java代码:

    public List<Map<String, Object>> subsBatchInsertion(Exchange exchange) {
    if (subsBatchCounter > batchSize) {
        subsPayLoad.clear();
        subsBatchCounter = 1;
    }
    subsPayLoad.add(generateInsert(exchange.getIn().getBody(SubscriptionCdr.class)));
    exchange.getIn().setHeader("subsCount", subsBatchCounter);
    subsBatchCounter++;
    return subsPayLoad;
    }
    
    public Map<String, Object> generateInsert(Cdr cdr) {
    Map<String, Object> insert = new HashMap<String, Object>();
    try {
        insert …
    Run Code Online (Sandbox Code Playgroud)

java apache-camel batch-processing camel-jdbc camel-sql

9
推荐指数
1
解决办法
876
查看次数

Apache Camel中窃听和多播有什么区别

Camel中的wireTap和多播的默认行为似乎相似.这两者都有助于以不同的方式处理相同的消息.那么wireTap和组播之间的主要区别是什么?

java jms multicast message-queue apache-camel

8
推荐指数
1
解决办法
2728
查看次数

Java 8 Stream - 为什么filter方法没有执行?

我正在学习使用java流过滤.但过滤后的流不会打印任何内容.我认为过滤方法没有被执行.我的过滤代码如下:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        s.startsWith("b");
        System.out.println("filter: " + s);
        return true;
    });
Run Code Online (Sandbox Code Playgroud)

没有编译错误,也没有例外.有什么建议吗?

java stream filter java-8 java-stream

6
推荐指数
1
解决办法
847
查看次数