我试图找出在Flux中映射元素时如何处理错误.
例如,我将CSV字符串解析为我的一个业务POJO:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
Run Code Online (Sandbox Code Playgroud)
其中一些行可能包含错误,因此我在日志中得到的是:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
Run Code Online (Sandbox Code Playgroud)
我在API中读到了一些错误处理方法,但大多数都提到返回"错误值"或使用回退Flux,如下所示:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
Run Code Online (Sandbox Code Playgroud)
然而,用我的myflux意思是再次处理整个通量.
那么,有没有办法在处理特定元素时处理错误(即忽略它们/记录它们)并继续处理其余的通量?
使用@akarnokd解决方法进行更新
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list …Run Code Online (Sandbox Code Playgroud) 我们有一个基于 Hadoop 的解决方案 (CDH 5.15),我们在 HDFS 中的某些目录中获取新文件。在 os 这些目录上,我们有 4-5 个 Impala (2.1) 表。在 HDFS 中写入这些文件的过程是 Spark Structured Streaming (2.3.1)
现在,一旦我们将文件写入 HDFS,我们就会运行一些 DDL 查询:
ALTER TABLE table1 RECOVER PARTITONS 检测添加到表中的新分区(及其 HDFS 目录和文件)。
REFRESH table1 PARTITIONS (partition1=X, partition2=Y),使用每个分区的所有键。
现在,这个 DDL 花费的时间有点长,而且它们在我们的系统中排队,破坏了系统的数据可用性。
所以,我的问题是:有没有办法更有效地进行这种数据整合?
我们考虑过:
使用ALTER TABLE .. RECOVER PARTITONS但根据文档,它只会刷新新分区。
尝试REFRESH .. PARTITON ...一次与多个分区一起使用,但语句语法不允许这样做。
尝试批处理查询,但 Hive JDBC 驱动器不支持批处理查询。
鉴于系统已经很忙,我们是否应该尝试并行执行这些更新?
谢谢!
胜利者
注意:我们知道哪些分区需要刷新的方式是使用 HDFS 事件,就像 Spark Structured Streaming 我们不知道文件何时被写入一样。
注意#2:另外,用 HDFS 编写的文件有时很小,所以如果可以同时合并这些文件就太好了。
我试图在开发模式(使用 H2)下运行当前在生产中使用 PostgreSQL 数据库的设置,但出现错误。如果我可以重用生产 SQL 而无需对其进行任何更改,那就最好了。
使用此设置:
# H2 Database
spring.datasource.datasource-class-name=org.h2.jdbcx.JdbcDataSource
spring.datasource.url=jdbc:h2:mem:userregistry;DB_CLOSE_DELAY=-1;MODE=PostgreSQL
Run Code Online (Sandbox Code Playgroud)
这个查询:
CREATE TABLE IF NOT EXISTS users.user_userrole (
user_signum VARCHAR(20) NOT NULL,
role VARCHAR(255) NOT NULL,
CONSTRAINT user_userrole_pk PRIMARY KEY (user_signum, role),
CONSTRAINT user_fk FOREIGN KEY (user_signum) REFERENCES users.user (signum) MATCH SIMPLE,
CONSTRAINT role_fk FOREIGN KEY (role) REFERENCES users.userrole
(role_name) MATCH SIMPLE
);
Run Code Online (Sandbox Code Playgroud)
引发此异常:
org.h2.jdbc.JdbcSQLException: Syntax error in SQL statement "<<SQL OMITTED FOR BREVITY>>";
expected "INDEX, ON, NOT, DEFERRABLE, NOCHECK, CHECK, ,, )"; [42001-185]
Run Code Online (Sandbox Code Playgroud)
请注意,我已经在使用Mode=PostgreSQL. 有任何想法吗? …
我正在编写一个小示例,Spark Structured Streaming其中我试图处理netstat命令的输出,但无法弄清楚如何调用该window函数。
这些是我的 build.sbt 的相关行:
scalaVersion := "2.11.4"
scalacOptions += "-target:jvm-1.8"
libraryDependencies ++= {
val sparkVer = "2.3.0"
Seq(
"org.apache.spark" %% "spark-streaming" % sparkVer % "provided",
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVer % "provided",
"org.apache.spark" %% "spark-core" % sparkVer % "provided" withSources(),
"org.apache.spark" %% "spark-hive" % sparkVer % "provided",
)
}
Run Code Online (Sandbox Code Playgroud)
和代码:
case class NetEntry(val timeStamp: java.sql.Timestamp, val sourceHost: String, val targetHost: String, val status: String)
def convertToNetEntry(x: String): NetEntry = {
// tcp …Run Code Online (Sandbox Code Playgroud)