PKJ*_*PKJ 6 jdbc elasticsearch logstash
我们假设Oracle Schema具有以下表和列:
Country
country_id; (Primary Key)
country_name;
Department
department_id; (Primary Key)
department_name;
country_id; (Foreign key to Country:country_id)
Employee
employee_id; (Primary Key)
employee_name;
department_id; (Foreign key to Department:department_id)
我有我的Elasticsearch文档,其中根元素是国家/地区,它包含该国家/地区中的所有部门,而这些部门又包含相应部门中的所有员工.
所以文档结构如下所示:
{
"mappings": {
"country": {
"properties": {
"country_id": { "type": "string"},
"country_name": { "type": "string"},
"department": {
"type": "nested",
"properties": {
"department_id": { "type": "string"},
"department_name": { "type": "string"},
"employee": {
"type": "nested",
"properties": {
"employee_id": { "type": "string"},
"employee_name": { "type": "string"}
}
}
}
}
}
}
}
}
我希望能够在每个表上运行单独的输入jdbc查询,并且只要添加/更新/删除基表中的数据,它们就应该在elasticsearch文档中创建/更新/删除数据.
这是一个示例问题,实际的表和数据结构更复杂.所以我不是在寻找限于此的解决方案.
有没有办法实现这个目标?
谢谢.
对于第一级,它直接使用聚合过滤器。它们之间需要有一个共同的 id 来引用。
filter {
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['department'] ||= []
map['department'] << event.to_hash.each do |key,value| { key => value } end
"
push_previous_map_as_event => true
timeout => 150000
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
}
Run Code Online (Sandbox Code Playgroud)
重要提示:输出操作应该更新
output {
elasticsearch {
action => "update"
...
}
}
Run Code Online (Sandbox Code Playgroud)
解决级别 2 的一种方法是查询已索引的文档并使用嵌套记录更新它。再次使用聚合过滤器;该文档应该有一个通用 ID,以便您可以查找并插入到正确的文档中。
filter {
#get the document from elastic based on id and store it in 'emp'
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST}/${INDEX_NAME}/${INDEX_TYPE}"]
query => "id:%{id}"
fields => { "employee" => "emp" }
}
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['employee'] = []
employeeArr = []
temp_emp = {}
event.to_hash.each do |key,value|
temp_emp[key] = value
end
#push the objects into an array
employeeArr.push(temp_emp)
empArr = event.get('emp')
for emp in empArr
emp['employee'] = employeeArr
map['employee'].push(emp)
end
"
push_previous_map_as_event => true
timeout => 150000
timeout_tags => ['aggregated']
}
if "aggregated" not in [tags] {
drop {}
}
}
output {
elasticsearch {
action => "update" #important
...
}
}
Run Code Online (Sandbox Code Playgroud)
另外,为了调试 ruby 代码,请在输出中使用以下内容
output{
stdout { codec => dots }
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1151 次 |
| 最近记录: |