标签: dataflow

如何在Java代码中监视/检查数据/属性流

当我需要捕获从一个API到另一个API的数据流时,我有一个用例.例如,我的代码使用hibernate从数据库读取数据,在数据处理期间,我将一个转换POJO为另一个并执行更多处理,然后最终转换为最终结果hibernate对象.简而言之喜欢的东西POJO1POJO2POJO3.

在Java中有一种方法可以推断POJO3的属性是从POJO1的这个属性中产生/转换的.我想看看我可以捕获从一个模型到另一个模型的数据流的东西.这个工具可以是编译时也可以是运行时,我对两者都没问题.

我正在寻找一种可以与代码并行运行的工具,并在每次运行的基础上提供数据沿袭细节.

java dataflow data-lineage

8
推荐指数
1
解决办法
274
查看次数

Apache Pig Latin参考手册

Pig是一个用于处理非常大的文件的数据流编程环境.猪的语言叫猪拉丁语.

有没有人知道PigLatin的好参考手册?我正在寻找包含该语言的所有语法和命令描述的内容.不幸的是,Pig wiki中的wiki页面被破坏了.

dataflow manual apache-pig

7
推荐指数
2
解决办法
1187
查看次数

MQ以异步方式处理,聚合和发布数据

一些背景,在得到真正的问题之前:

我正在开发一个由几个不同模块组成的后端应用程序.目前,每个模块都是一个命令行java应用程序,它是"按需"运行的(稍后会详细介绍).

每个模块都是一个"步骤",是一个更大的过程的一部分,您可以将其视为数据流; 第一步从外部源收集数据文件并将其推送/加载到某些SQL数据库表中; 然后根据不同的条件和事件(时间,数据库中存在数据,通过Web服务/ Web界面完成的消息和详细说明),从(1个或多个)DB表中获取数据,处理它们,并将它们写在不同的表格上.步骤在三个不同的服务器上运行,并从三个不同的DB读取数据,但只能在一个DB中写入.目的是汇总数据,计算指标和统计数据.

目前,每个模块都是定期执行的(从第一个模块的几分钟/小时,到链中最后一个模块的几天,需要聚合更多数据,因此等待"更长时间"从它们可用),使用的cronjob.运行一个模块(当前是一个java控制台应用程序),它会检查数据库中给定日期时间窗口中新的未处理信息,并完成其工作.

问题:它有效,但是......我需要扩展和维护它,这种方法开始显示其局限性.

  1. 我不喜欢依靠"民意调查"; 这是浪费,考虑到以前模块的信息足以在他们需要的信息可用时"告诉"链中的其他模块,并且他们可以继续.
  2. 它"缓慢":链条上的模块延迟了几天,因为我们必须确保数据是由前面的模块到达和处理的.所以我们"停止"这些模块,直到我们确定我们拥有所有数据.新增功能需要实时(不是很难,但"尽快")计算某些指标.一个很好的例子就是在这里发生的事情,在SO上,带有徽章!:)我需要获得一些非常相似的东西.

为了解决第二个问题,我将介绍"部分"或"增量"计算:只要我有一组相关信息,我就会处理它.然后,当一些其他链接信息到达时,我计算差异并相应地更新数据,但我还需要通知其他(从属)模块.

问题

- 1)哪种方法最好? - 2)相关:哪些是"通知"其他模块(在我的情况下是java可执行文件)相关数据可用的最佳方式?

我可以看到三种方式:

  • 将其他"非数据"表添加到数据库中,每个模块写入"嘿,我已经完成了这个并且它可用".当cronjob启动另一个模块时,它会读取表格,确定他可以计算子集xxx,然后执行.等等
  • 使用Message Queues,如ZeroMQ(或Apache Camel,如@mjn建议)而不是DB表
  • 使用像Redis这样的键值存储,而不是数据库表

编辑:我确信基于队列的方法是要走的路,我为完整性添加了"table + polling"选项,但现在我明白这只是一种分心(显然,每个人都会回答"是的,使用队列,民意调查是邪恶的" - 这是正确的!".因此,让我重新解释一下这个问题: 使用像Redis这样的pub/sub的键值存储使用MQ的优点/缺点是什么?

  • 3)有没有任何解决方案可以帮我完全摆脱cronjobs?

编辑:特别是,在可能的情况下,它意味着:在某些MQ和/或键值存储中是否有一种机制可以让我发布带有"时间"的消息?比如"在1天内交付"?显然,持久性和"几乎一次"交付保证

  • 4)我应该将此消息(基于事件?)的解决方案构建为集中式服务,并将其作为其中一个服务器上的守护程序/服务运行吗?
  • 5)我是否应该放弃按需启动订阅者的想法,让每个模块作为守护进程/服务连续运行?
  • 6)哪些是赞成和缺点(可靠性,单点故障与资源使用和复杂性......)?

编辑:这是我最关心的一点:我想"排队"自己根据队列中的消息激活"模块",类似于MSMQ激活.这是个好主意吗?Java世界中有什么东西可以实现它,我应该自己实现它(通过MQ还是通过Redis),还是应该将每个模块作为守护进程运行?(即使某些计算通常是在突发中发生,两小时处理然后是两天的空转?)

注意:我不能使用重型容器/ EJB(No Glassfish或类似产品)

编辑:骆驼对我来说似乎有点太重了.无论是在资源还是开发的复杂性方面,我都在寻找一些非常轻松的东西

java notifications dataflow message-queue redis

7
推荐指数
1
解决办法
753
查看次数

Angular 2数据流与Flux之间的关键区别是什么?

嗨,我现在正在研究Angular 2和React + Redux,我对这两个选择的数据流差异的区别有疑问.

  1. Angular 2默认使用单向数据流.Redux是一个Flux实现,它(也)使用单向数据流.这些之间的关键区别是什么?(可能是部件的组成?)
  2. 如果这两者在数据流方面没有太大差异,为什么有人会使用Flux或Redux而不是默认选择Angular 2框架?
  3. 如果这两者完全不同,是否有一个名称我可以调用Angular 2的数据流以进一步参考来比较这两者?

非常感谢提前!

dataflow flux reactjs redux angular

7
推荐指数
1
解决办法
1087
查看次数

Dataflow/Apache Beam 在哪个阶段确认发布/订阅消息?

我有一个数据流流作业,将 Pub/Sub 订阅作为无限源。我想知道数据流在哪个阶段确认传入的发布/订阅消息。在我看来,如果在数据流管道的任何阶段抛出异常,消息就会丢失。

此外,我想知道如何使用发布/订阅无界源编写数据流管道以在失败时进行消息检索的最佳实践。谢谢!

dataflow google-cloud-platform google-cloud-pubsub apache-beam

7
推荐指数
1
解决办法
1937
查看次数

如何在Python 3.x上获取用于数据流GCP的Apache Beam

我对GCP和数据流非常陌生。但是,我想开始测试和部署一些利用GCP上的数据流的流。根据文档,有关数据流的所有内容都必须使用Apache项目BEAM。因此,请按照此处的官方文档进行操作的情况下,受支持的python版本是2.7

坦白地说,由于Python 2.x版本将由于没有官方支持而消失,并且每个人都在使用3.x版本,因此这确实令人失望。尽管如此,我想知道是否有人知道如何准备在python版本中运行的beam和GCP数据流。

我看了这部影片并了这个牧师如何完成这个美好的里程碑,并且显然可以在Python 3.5上运行。

更新资料

自从我努力处理数据流以来,我想要的伙计们引起了我的思考。我对使用Java或Python版本的工具开始具有挑战性感到非常失望。从python开始,存在关于版本3的限制,该版本几乎是当前的标准。另一方面,java在版本11上运行时会遇到问题,我必须进行一些调整才能在代码的版本8上运行,然后我开始在代码上遇到许多不兼容问题。简而言之,如果GCP真正想前进并成为第一名,那么还有很多地方需要改进。:失望

解决方法

我将Java版本降级为jdk 8,安装了maven,现在eclipse版本适用于Apache Beam。

我终于解决了,但是,GCP确实请考虑增强并扩展对Java / Python最新版本的支持。

非常感谢

python dataflow google-cloud-platform google-cloud-dataflow apache-beam

7
推荐指数
2
解决办法
3349
查看次数

DD异常,并清理数据库资源:有一个干净的解决方案吗?

这是我们写的一段代码:

    public CustomerTO getCustomerByCustDel(final String cust, final int del)
            throws SQLException {
        final PreparedStatement query = getFetchByCustDel();
        ResultSet records = null;
        try {
            query.setString(1, cust);
            query.setInt(2, del);
            records = query.executeQuery();

            return this.getCustomer(records);
        } finally {
            if (records != null) {
                records.close();
            }
            query.close();
        }
    }

如果省略'finally'块,则会使数据库资源悬空,这显然是一个潜在的问题.但是,如果您执行我在此处所做的操作 - 在**try**块之外将ResultSet设置为null,然后将其设置为块内所需的值 - PMD报告'DD异常'.在文档中,DD异常描述如下:

DataflowAnomalyAnalysis:数据流分析跟踪数据流上不同路径上的变量的本地定义,未定义和引用.从这些信息中可以发现各种问题.[...] DD - 异常:重新定义了最近定义的变量.这是不祥的,但不一定是一个错误.

如果在块之外声明ResultSet而不设置值,则在执行if(记录!= null)测试时,正确地得到"变量可能尚未初始化"错误.

现在,在我看来,我在这里使用不是一个错误.但有没有一种干净的重写方式,不会触发PMD警告?我并不特别想要禁用PMD的DataFlowAnomalyAnalysis规则,因为识别UR和DU异常实际上是有用的; 但是这些DD异常让我怀疑自己可以做得更好 - 而且,如果没有更好的方法可以做到这一点,那么它们就会变得混乱(我应该看看是否可以重写PMD规则)

findbugs dataflow jdbc pmd

6
推荐指数
1
解决办法
3803
查看次数

如何跳过SSIS数据流中的最后一行

我在我的数据流中使用FlatFile Source Manager- > Script COmponent as Trans- > OLEDB destination.

Source从平面文件中读取所有行,我想跳过更新数据库的最后一行(Trailer record).

由于它包含NULL值,因此数据库会引发错误.

请帮我解决这个问题.

问候,VHK

sql-server ssis etl dataflow ssis-2012

6
推荐指数
2
解决办法
5286
查看次数

将 Java 对象转换为 BigQuery TableRow

我正在探索 Google Cloud Dataflow。

我想知道是否可以在java对象或JSON到TableRow之间自动转换。

就像我们可以自动将JSON解析为POJO类一样。

我找不到相关信息。希望不要重复问题。

如有任何信息,将不胜感激!

问候

dataflow

6
推荐指数
1
解决办法
3791
查看次数

限制光束应用的一步

我在 google 数据流上使用 python beam,我的管道如下所示:

从文件中读取图像 url >> 下载图像 >> 处理图像

问题是我不能让下载图像按需要进行缩放,因为我的应用程序可能会被图像服务器阻止。

这是一种可以节流步骤的方法吗?每分钟输入或输出。

谢谢你。

python dataflow google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
1773
查看次数