This post is older than a year. Consider some information might not be accurate anymore.
Used: logstash v6.1.1 elasticsearch v5.6.4 kibana v5.6.4
Storing data in Elasticsearch with city names, offers the capability to display in Kibana the distribution of the data in geographical map. To use that feature, you have to declare a geo_point
type in your index mapping. I named the field location
. To translate the city names to their respective geo point I use the logstash translate filter. Having a small dictionary, logstash will just take the value for your input city. You could also use zip codes, but this would require a more detailed data source. For the demonstration of the translation plugin it is sufficient.
Internal Dictionary
Using a internal dictionary for small hash.
input {
stdin { codec => "json" }
}
filter {
translate {
field => "merchantCity"
destination => "geo_point"
dictionary => [ "WIEN", '{"lat": 48.2,"lon":16.366667}',
"VIENNA", '{"lat": 48.2,"lon":16.366667}' ]
}
}
output {
stdout { codec => "json" }
}
If you put WIEN
or VIENNA
into the stdin
, you see it gets translated.
The stdin plugin is now waiting for input:
[2018-02-06T09:42:28,614][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
{ "merchantCity":"WIEN" }
{
"merchantCity" => "WIEN",
"@version" => "1",
"host" => "omega",
"geo_point" => "{\"lon\": 16.366667,\"lat\": 48.2}",
"location" => {
"lon" => 16.366667,
"lat" => 48.2
},
"@timestamp" => 2018-01-26T08:43:59.949Z
}
^C[2018-02-06T09:44:03,948][WARN ][logstash.runner ] SIGINT received. Shutting down the agent.
[2018-02-06T09:44:04,228][INFO ][logstash.pipeline ] Pipeline terminated {"pipeline.id"=>"main"}
External Dictionary
Use dictionary file cities.yaml (cities in Austria), for the sake of the example I keep it short. Recommended if you have all cities of a country.
---
WIEN: '{"lon": 16.366667,"lat": 48.2}'
VIENNA: '{"lon": 16.366667,"lat": 48.2}'
The logstash config
input {
stdin { codec => "json" }
}
filter {
translate {
field => "merchantCity"
destination => "geo_point"
dictionary_path => "/home/tan/cities.yaml"
}
json {
source => "geo_point"
target => "location"
}
}
output {
stdout { codec => "rubydebug" }
}
Updating Elasticsearch
logstash can also post-process your elasticsearch indices, by querying it, enriching it and updating it.
input {
elasticsearch {
hosts => "my-elasticsearch"
index => "payments-2017.12.*"
query => '{"query":{"bool":{"must":[{"term":{"issuer":{"value":"BA"}}},{"exists":{"field":"merchantCity"}}],"must_not":[{"exists":{"field":"location"}}]}}}'
user => "lemapper"
password => "MapperKing"
docinfo => true
}
}
filter {
translate {
field => "merchantCity"
destination => "geo_point"
dictionary_path => "/home/tan/cities.yaml"
}
json {
source => "geo_point"
target => "location"
remove_field => [ "geo_point" ]
}
}
output {
stdout { codec => "dots" }
if [location] {
elasticsearch {
hosts => "my-elasticsearch"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
doc_as_upsert => true
user => "lemerger"
password => "MergeMe"
}
}
}
Conclusion
The logstash translate filter plugin is a powerful tool for enriching data like geo points, either a ingestion or post-processing.