小编Vic*_*tor的帖子

如何在执行Flux.map()时处理错误

我试图找出在Flux中映射元素时如何处理错误.

例如,我将CSV字符串解析为我的一个业务POJO:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Run Code Online (Sandbox Code Playgroud)

其中一些行可能包含错误,因此我在日志中得到的是:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
Run Code Online (Sandbox Code Playgroud)

我在API中读到了一些错误处理方法,但大多数都提到返回"错误值"或使用回退Flux,如下所示:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
Run Code Online (Sandbox Code Playgroud)

然而,用我的myflux意思是再次处理整个通量.

那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?

使用@akarnokd解决方法进行更新

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list …
Run Code Online (Sandbox Code Playgroud)

java reactive-programming project-reactor

22
推荐指数
3
解决办法
1万
查看次数

如何有效地更新文件经常被修改的 Impala 表

我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们在 HDFS 中的某些目录中获取新文件。在 os 这些目录上,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)

现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:

  • ALTER TABLE table1 RECOVER PARTITONS 检测添加到表中的新分区(及其 HDFS 目录和文件)。

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y),使用每个分区的所有键。

现在,这个 DDL 花费的时间有点长,而且它们在我们的系统中排队,破坏了系统的数据可用性。

所以,我的问题是:有没有办法更有效地进行这种数据整合?

我们考虑过:

  • 使用ALTER TABLE .. RECOVER PARTITONS但根据文档,它只会刷新新分区。

  • 尝试REFRESH .. PARTITON ...一次与多个分区一起使用,但语句语法不允许这样做。

  • 尝试批处理查询,但 Hive JDBC 驱动器不支持批处理查询。

  • 鉴于系统已经很忙,我们是否应该尝试并行执行这些更新?

  • 你知道的其他方式吗?

谢谢!

胜利者

注意:我们知道哪些分区需要刷新的方式是使用 HDFS 事件,就像 Spark Structured Streaming 我们不知道文件何时被写入一样。

注意#2:另外,用 HDFS 编写的文件有时很小,所以如果可以同时合并这些文件就太好了。

hadoop impala cloudera-cdh spark-structured-streaming

9
推荐指数
1
解决办法
676
查看次数

JdbcSQLException 在 H2 中使用“MATCH simple”执行 PostgreSQL 查询

我试图在开发模式(使用 H2)下运行当前在生产中使用 PostgreSQL 数据库的设置,但出现错误。如果我可以重用生产 SQL 而无需对其进行任何更改,那就最好了。

使用此设置:

# H2 Database
spring.datasource.datasource-class-name=org.h2.jdbcx.JdbcDataSource
spring.datasource.url=jdbc:h2:mem:userregistry;DB_CLOSE_DELAY=-1;MODE=PostgreSQL
Run Code Online (Sandbox Code Playgroud)

这个查询:

CREATE TABLE IF NOT EXISTS users.user_userrole (
    user_signum VARCHAR(20) NOT NULL,
    role VARCHAR(255) NOT NULL,
    CONSTRAINT user_userrole_pk PRIMARY KEY (user_signum, role),
    CONSTRAINT user_fk FOREIGN KEY (user_signum) REFERENCES users.user (signum) MATCH SIMPLE,
    CONSTRAINT role_fk FOREIGN KEY (role) REFERENCES users.userrole   
  (role_name) MATCH SIMPLE
);
Run Code Online (Sandbox Code Playgroud)

引发此异常:

org.h2.jdbc.JdbcSQLException: Syntax error in SQL statement "<<SQL OMITTED FOR BREVITY>>"; 
expected "INDEX, ON, NOT, DEFERRABLE, NOCHECK, CHECK, ,, )";  [42001-185]
Run Code Online (Sandbox Code Playgroud)

请注意,我已经在使用Mode=PostgreSQL. 有任何想法吗? …

postgresql spring h2

5
推荐指数
1
解决办法
1958
查看次数

在 Spark Structured Streaming 中找不到“窗口”函数

我正在编写一个小示例,Spark Structured Streaming其中我试图处理netstat命令的输出,但无法弄清楚如何调用该window函数。

这些是我的 build.sbt 的相关行:

scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"

libraryDependencies ++= {

  val sparkVer = "2.3.0"
  Seq(
    "org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
    "org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
    "org.apache.spark" %% "spark-hive" % sparkVer % "provided",
  )
}
Run Code Online (Sandbox Code Playgroud)

和代码:

case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)

def convertToNetEntry(x: String): NetEntry = {
    // tcp …
Run Code Online (Sandbox Code Playgroud)

spark-streaming spark-structured-streaming

1
推荐指数
1
解决办法
2295
查看次数