Apache Kafka:分布式消息传递系统
Apache Storm:实时消息处理
我们如何在实时数据管道中使用这两种技术来处理事件数据?
就实时数据管道而言,在我看来,两者都是相同的.我们如何在数据管道上使用这两种技术?
我的四人开发团队已经面临这个问题一段时间了:
有时我们需要处理同一组数据.因此,当我们在本地计算机上进行开发时,dev数据库将远程连接.
但是,有时我们需要在db上运行操作,这些操作将依赖于其他开发人员的数据,即我们破坏关联.为此,本地数据库会很好.
是否有解决这种困境的最佳做法?有没有像"数据SCM"工具?
以一种奇怪的方式,在git repo中保留SQL插入/删除/更新查询的文本文件会很有用,但我认为这可能非常快速地变慢.
你们怎么处理这个?
我正在尝试使用JSON输入步骤处理以下内容:
{"address":[
{"AddressId":"1_1","Street":"A Street"},
{"AddressId":"1_101","Street":"Another Street"},
{"AddressId":"1_102","Street":"One more street", "Locality":"Buenos Aires"},
{"AddressId":"1_102","Locality":"New York"}
]}
Run Code Online (Sandbox Code Playgroud)
然而,这似乎是不可能的:
Json Input.0 - ERROR (version 4.2.1-stable, build 15952 from 2011-10-25 15.27.10 by buildguy) :
The data structure is not the same inside the resource!
We found 1 values for json path [$..Locality], which is different that the number retourned for path [$..Street] (3509 values).
We MUST have the same number of values for all paths.
Run Code Online (Sandbox Code Playgroud)
该步骤提供Ignore Missing Path标志,但只有在所有行都错过相同路径时才有效.在这种情况下,步骤按预期运行,用null填充缺失值.
这限制了这一步骤读取不均匀数据的能力,这实际上是我的优先事项之一.
我的步骤字段定义如下:
我错过了什么吗?这是正确的行为吗?
我正在研究ETL工具,将平面文件导入数据库,然后导出xml文件.
许多工具支持生成在应用程序中使用的代码; 但是,我还没有找到任何支持使用您的应用程序中已有的代码.我们的模型很复杂(关系,验证,多态关联,回调等).
有哪些工具可以重用现有代码?或者我是否在ETL工具中重新创建(并维护)我的模型?
注意:我对ETL的要求(与批量插入或activerecord-import相反)是转换.我们收到来自200多个不同来源的数据,包括各种格式,完整程度和清洁度.此外,对于将要定义转换的技术较少的用户来说,"设计师"最常用的包含更为真实.
我一直在研究数据集成方法“全局作为视图”和“本地作为视图”,但我找不到任何关于如何为这些方法形成查询的示例,任何人都可以给我示例如何使用 GAV 和 来查询这些数据集成方法请 LAV
我在这里专门询问GAV和LAV
我知道 GAV(全局视图)是通过数据源描述的,而 LAV(局部视图)是通过中介模式描述的。但是我不完全确定这些术语的含义,也不完全确定它们如何影响生成的查询。
有一个 GAV 的维基百科页面,没有查询示例,遗憾的是没有 LAV 的维基百科页面
我目前正在评估Talend ETL(Talend Open Studio for Data Integration).
我想知道如何/如果我可以将ETL作业公开为Web服务.
我知道我可以将作业导出为Web服务并通过特定的URL调用它们,但我的目标是能够使用IN/OUT参数公开特定的WSDL.
一个示例用例是:
1)在Talend ETL中调用WS并使用数据传递XML
2)Talend ETL从XML中提取数据,并将它们作为变量插入到要对DB执行的查询中.
3)Talend ETL将结果集从DB转换为XML数据并响应WS客户端.
最后,我还想知道相同的场景是否可以作为REST服务公开.
我想使用 PDI 发送电子邮件。我创建了一个工作并添加了“邮件”元素。有我的参数。
Server smtp.gmail.com
Port: 587
Use Authentication
User : mygmailusername
Pass : mygmailpass
Secure Con Type : TLS
Run Code Online (Sandbox Code Playgroud)
当我运行作业时,出现错误:Problem while sending message : javax.mail.AuthenticationFailedException
当我将端口更改为 465 时,gmail 拒绝了该消息,并向 mygmailusername 发送了一封电子邮件,表明未经授权的登录尝试。
但是我已经将我的 BI 服务器的端口更改为 9090。我不知道这是否也是一个原因。我假设 PDI 以某种方式与 BI 服务器通信,如果它是发送电子邮件的 BI 服务器。我该如何让它工作?
所以有一点背景 - 我们有大量的数据源,从RDBMS到S3文件.我们希望将这些数据与其他各种数据仓库,数据库等同步和集成.
起初,这似乎是卡夫卡的典型模式.我们希望通过Kafka将数据更改流式传输到数据输出源.在我们的测试案例中,我们使用Oracle Golden Gate捕获更改并成功将更改推送到Kafka队列.但是,将这些更改推送到数据输出源已经证明具有挑战性.
我意识到如果我们只是向Kafka主题和队列添加新数据,这将非常有效.我们可以缓存更改并将更改写入各种数据输出源.然而,这种情况并非如此.我们将更新,删除,修改分区等.处理此问题的逻辑似乎要复杂得多.
我们尝试使用登台表和连接来更新/删除数据,但我觉得这会很快变得非常笨拙.
这就是我的问题 - 我们可以采取哪些不同的方法来处理这些操作?或者我们应该完全朝着不同的方向前进?
任何建议/帮助非常感谢.谢谢!
我每天都有多次传入各种CSV文件,存储来自传感器的时间序列数据,传感器是传感器站的一部分.每个CSV都以它所来自的传感器站和传感器ID命名,例如"station1_sensor2.csv".目前,数据存储如下:
> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;
Run Code Online (Sandbox Code Playgroud)
我创建了一个Cassandra表来存储它们,并能够查询它们以查找各种已识别的任务.Cassandra表看起来像这样:
cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
CREATE TABLE sensor_data (
station_id text, // id of the station
sensor_id text, // id of the sensor
tps timestamp, // timestamp of the measure
val float, // measured value
PRIMARY KEY ((station_id, sensor_id), tps)
);
Run Code Online (Sandbox Code Playgroud)
我想使用Apache Nifi自动将CSV中的数据存储到此Cassandra表中,但我找不到示例或方案来正确执行.我曾尝试使用"PutCassandraQL"处理器,但我在没有任何明确的例子的情况下苦苦挣扎.所以,任何帮助如何执行Cassandra放置查询与Apache Nifi将数据插入表中将不胜感激!
我想解析MaxMind GeoIP2数据库的两个CSV文件,根据列进行一些连接并将结果合并到一个输出文件中.
我使用标准的CSV ruby库,它很慢.我认为它试图将所有文件加载到内存中.
block_file = File.read(block_path)
block_csv = CSV.parse(block_file, :headers => true)
location_file = File.read(location_path)
location_csv = CSV.parse(location_file, :headers => true)
CSV.open(output_path, "wb",
:write_headers=> true,
:headers => ["geoname_id","Y","Z"] ) do |csv|
block_csv.each do |block_row|
puts "#{block_row['geoname_id']}"
location_csv.each do |location_row|
if (block_row['geoname_id'] === location_row['geoname_id'])
puts " match :"
csv << [block_row['geoname_id'],block_row['Y'],block_row['Z']]
break location_row
end
end
end
Run Code Online (Sandbox Code Playgroud)
是否有另一个ruby库支持chuncks中的处理?
block_csv
是800MB, location_csv
是100MB.
data-integration ×10
apache-kafka ×2
etl ×2
kettle ×2
pentaho ×2
activerecord ×1
apache-nifi ×1
apache-storm ×1
cassandra ×1
cql ×1
csv ×1
database ×1
email ×1
json ×1
model ×1
ruby ×1
talend ×1