This post is older than a year. Consider some information might not be accurate anymore.
Elasticsearch with its Query DSL allows powerful aggregations in order to save documents and disk space. After a certain period of time a certain level of detail is not needed anymore. For instance, I collect on a daily basis statistical data about fraud prevention services.
GET _cat/indices/fraud?v&s=index:asc
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open fraud-2017.08.17 OlwgIRenQ_2dyiKm-Aapkw 2 1 1680 0 1.2mb 634kb
green open fraud-2017.08.18 0aJcUMbFQSa3DpGtg1l5iw 2 1 20160 0 12.8mb 6.4mb
green open fraud-2017.08.19 pQCsW7NpSZe5UuJT5vcIvQ 2 1 20160 0 12.6mb 6.3mb
green open fraud-2017.08.20 G4qG8L5HRKGHddx9jyYbvQ 2 1 26160 0 15mb 7.5mb
green open fraud-2017.08.21 UNkfZaXISj-p2fOBUxor0Q 2 1 92789 0 45.3mb 22.7mb
green open fraud-2017.08.22 bz8vqtC2RUW1YjnN2L2oQw 2 1 88361 0 44.2mb 22mb
green open fraud-2017.08.23 8AtPnSy-TVu0fMEQ3lbcsw 2 1 80999 0 40.6mb 20.3mb
green open fraud-2017.08.24 3GIiEC7aRCOvFFzM0_Fi2w 2 1 194570 0 70.4mb 35.1mb
green open fraud-2017.08.25 og1Xx9XITJa1gKBxdQMYLQ 2 1 234934 0 83.8mb 41.8mb
green open fraud-2017.08.26 AUwvDE0JR0aKIHkKGTseSg 2 1 235553 0 84.4mb 42.2mb
green open fraud-2017.08.27 MdLO0ULoQn6al0MSdPu8IA 2 1 275991 0 93.1mb 46.5mb
green open fraud-2017.08.28 93nZmpkOSWOcUbm1CxHtCQ 2 1 324153 0 106mb 53mb
green open fraud-2017.08.29 Nm021E6sTFi-9DmlGqQEkA 2 1 315797 0 103.4mb 51.7mb
green open fraud-2017.08.30 NCJbV-uLSzGbjjRQMvRqug 2 1 283340 0 96.1mb 48mb
green open fraud-2017.08.31 AQAWdCqDT3ehvjyn4FbLwg 2 1 332613 0 115.4mb 57.6mb
green open fraud-2017.09.01 fNdnrAzIRbOhwuHMsBH-KA 2 1 305892 0 109.7mb 55mb
green open fraud-2017.09.02 Z9ynOZfhQgeIi8EH9VUzTg 2 1 276176 0 103.6mb 52mb
green open fraud-2017.09.03 IUE7xIf0RFyqfxqeTmDtTQ 2 1 231013 0 91mb 45.6mb
green open fraud-2017.09.04 bwyZp5eMTa-9tZa2dGIw6g 2 1 268054 0 100.9mb 50.2mb
green open fraud-2017.09.05 tuGZS68IT6aQV5fQbwkL1A 2 1 235889 0 92.9mb 46.4mb
green open fraud-2017.09.06 DS2syWlHSSKzzwCqm2ImAA 2 1 227299 0 89.8mb 45mb
green open fraud-2017.09.07 PwZ39BHVRDekpe3Eklapgg 2 1 251881 0 92.9mb 46.4mb
green open fraud-2017.09.08 tkLcydSoT9KIBTjjEFXH1A 2 1 175374 0 68.5mb 34.3mb
green open fraud-2017.09.09 IaRhV8MHTaO8WHCDYqyf7g 2 1 184333 0 79.3mb 39.6mb
green open fraud-2017.09.10 5Kc-F3omQHiA1YzU4U5Y4Q 2 1 161799 0 70.6mb 35.4mb
green open fraud-2017.09.11 Ajbw9XnNTga66bN-U7IgTA 2 1 205447 0 83.1mb 41.5mb
green open fraud-2017.09.12 8DrE-dZ_TQmr1Boor09BKw 2 1 187816 0 70.2mb 35mb
green open fraud-2017.09.13 hkQ3WQ49SM-rggxqQonfiw 2 1 234633 0 88.5mb 44.3mb
green open fraud-2017.09.14 Q39tR7sKSHqgbEJpJTQiZQ 2 1 230865 0 87.9mb 43.9mb
green open fraud-2017.09.15 ebpcLvWnSkSLen7OP7w14Q 2 1 188488 0 78.3mb 39.2mb
green open fraud-2017.09.16 FBFbgbadQg-oTMDfNoob0g 2 1 224340 0 96mb 48mb
green open fraud-2017.09.17 6z6TpLq7TXuM64K1hYlaDQ 2 1 239607 0 98.6mb 49.1mb
green open fraud-2017.09.18 Zq-KBD2sTd2Wnj00Eqv2qQ 2 1 207967 0 90.4mb 45.2mb
green open fraud-2017.09.19 VaZmTvtqRY6UmF779jb3TA 2 1 209122 0 77.5mb 38.8mb
green open fraud-2017.09.20 avVtOhkqSZuccPkYuOmU5g 2 1 203056 0 74.6mb 37.3mb
green open fraud-2017.09.21 gakY_3jHSUq1maHR-4wLXA 2 1 127662 0 55mb 27.4mb
green open fraud-2017.09.22 zMDUizWVREq9590wUsJdqQ 2 1 127546 0 55.5mb 27.7mb
green open fraud-2017.09.23 ptPaU1WZSKa57jflLpdvNA 2 1 91948 0 44.9mb 22.4mb
green open fraud-2017.09.24 uOi464xxTBeoUGDGMQAckQ 2 1 104120 0 47.3mb 23.6mb
green open fraud-2017.09.25 tHMnRTu9R3W_woxsKS2qAA 2 1 98119 0 46.3mb 23mb
green open fraud-2017.09.26 XgHv3j9ASwq0q_U4otsSFw 2 1 118299 0 52mb 26mb
green open fraud-2017.09.27 CeY_Qw1eQ1yEi7WalM_Zlg 2 1 135067 0 61.1mb 30.5mb
green open fraud-2017.09.28 5SRhQvB1RdeLPy8WiDQjGA 2 1 121341 0 56.8mb 28.7mb
green open fraud-2017.09.29 L2zfdZZCR9e-pQnQ9e5I1A 2 1 136221 0 63.1mb 31.2mb
green open fraud-2017.09.30 oncWQAIcSzmBPRt2wlHvTA 2 1 165502 0 80.9mb 40.5mb
green open fraud-2017.10.01 OOg1SZH1Qjmo85NSxbDfjg 2 1 162648 0 77mb 38.6mb
green open fraud-2017.10.02 wc6l_5WDRVCHMPiUx9BBRQ 2 1 177023 0 82.5mb 41.4mb
green open fraud-2017.10.03 6GYS6z8hSqynFFI9GxyWTA 2 1 186684 0 72mb 36mb
green open fraud-2017.10.04 _ZkXUpbbRO-euZv_8Vatlw 2 1 177498 0 69.5mb 34.7mb
green open fraud-2017.10.05 6G1OZobKTbKQ9MdncVhtPQ 2 1 180769 0 70.3mb 35.1mb
green open fraud-2017.10.06 leXb6SkhQASzcZ164hSksg 2 1 194112 0 74.3mb 37.2mb
green open fraud-2017.10.07 4rvy0nWWRZGf42eyPBECPg 2 1 181823 0 70.3mb 35.1mb
green open fraud-2017.10.08 9rTk6wO_ThWAOntg98qIYg 2 1 125629 0 54.4mb 27.2mb
green open fraud-2017.10.09 opTUqawdTzqeFDIs5ouAfw 2 1 144947 0 59.3mb 29.6mb
green open fraud-2017.10.10 xvQlqnhlSSiwmJU345_tNA 2 1 141745 0 58.4mb 29.1mb
green open fraud-2017.10.11 NODK8l5WS06iYsZ94Ui-AA 2 1 132986 0 56.5mb 28.2mb
green open fraud-2017.10.12 aEwb4ihqQ7iKeLRm_ALB6w 2 1 135184 0 57.2mb 28.6mb
green open fraud-2017.10.13 WJHzV1RzR2SazRfN5P_8Bw 2 1 143217 0 59.5mb 29.7mb
green open fraud-2017.10.14 qQmNX0sySxG7ow21vn3Bnw 2 1 133659 0 57mb 28.5mb
green open fraud-2017.10.15 xch4F_E_Rhi8eX1CQlLyCQ 2 1 121647 0 53.1mb 26.5mb
green open fraud-2017.10.16 5bk-GdujRyOpK2JbSvjdOA 2 1 141811 0 58.5mb 29.2mb
green open fraud-2017.10.17 wU1uLfkETgaTTyuAGQjxhw 2 1 173206 0 66.1mb 33mb
green open fraud-2017.10.18 1wwFZwfjSLmVNxhzo3-3Rw 2 1 142172 0 59.5mb 29.7mb
green open fraud-2017.10.19 ZzITeyNaSfWAvv9AiKNr9A 2 1 126300 0 55.8mb 27.8mb
green open fraud-2017.10.20 BbwIASphRXe3jbE-hgammg 2 1 46324 0 20.8mb 10.4mb
Each minute statistical values are logged. If we look for one logger a whole day:
Get count
GET fraud-2017.08.19/_search
{
"size": 0,
"query": {
"bool": {
"must": [
{
"match": {
"channel.keyword": "Issuing"
}
},
{
"match": {
"logger.keyword": "STA9101"
}
}
]
}
}
}
{
"took": 14,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1440,
"max_score": 0,
"hits": []
}
}
1440 documents = 1 doc * 60 minute * 24 hours = 1440 metric documents
Aggregate
1440 metric documents can be reduced to 24 documents on a hour basis
Chain aggregations: date histogram and metric aggregation
GET fraud-2017.08.19/_search
{
"size": 0,
"query": {
"bool": {
"must": [
{
"match": {
"channel.keyword": "Issuing"
}
},
{
"match": {
"logger.keyword": "STA9101"
}
}
]
}
},
"aggs": {
"trx_over_time": {
"date_histogram": {
"field": "@timestamp",
"interval": "1h"
},
"aggs": {
"sum_trx": {
"sum": {
"field": "value"
}
}
}
}
}
}
Get 24 hours
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1440,
"max_score": 0,
"hits": []
},
"aggregations": {
"trx_over_time": {
"buckets": [
{
"key_as_string": "2017-08-19T00:00:00.000Z",
"key": 1503100800000,
"doc_count": 60,
"sum_trx": {
"value": 6742
}
},
{
"key_as_string": "2017-08-19T01:00:00.000Z",
"key": 1503104400000,
"doc_count": 60,
"sum_trx": {
"value": 4734
}
},
{
"key_as_string": "2017-08-19T02:00:00.000Z",
"key": 1503108000000,
"doc_count": 60,
"sum_trx": {
"value": 3752
}
},
{
"key_as_string": "2017-08-19T03:00:00.000Z",
"key": 1503111600000,
"doc_count": 60,
"sum_trx": {
"value": 5408
}
},
{
"key_as_string": "2017-08-19T04:00:00.000Z",
"key": 1503115200000,
"doc_count": 60,
"sum_trx": {
"value": 13376
}
},
{
"key_as_string": "2017-08-19T05:00:00.000Z",
"key": 1503118800000,
"doc_count": 60,
"sum_trx": {
"value": 34932
}
},
{
"key_as_string": "2017-08-19T06:00:00.000Z",
"key": 1503122400000,
"doc_count": 60,
"sum_trx": {
"value": 93086
}
},
{
"key_as_string": "2017-08-19T07:00:00.000Z",
"key": 1503126000000,
"doc_count": 60,
"sum_trx": {
"value": 163467
}
},
{
"key_as_string": "2017-08-19T08:00:00.000Z",
"key": 1503129600000,
"doc_count": 60,
"sum_trx": {
"value": 230601
}
},
{
"key_as_string": "2017-08-19T09:00:00.000Z",
"key": 1503133200000,
"doc_count": 60,
"sum_trx": {
"value": 264623
}
},
{
"key_as_string": "2017-08-19T10:00:00.000Z",
"key": 1503136800000,
"doc_count": 60,
"sum_trx": {
"value": 248176
}
},
{
"key_as_string": "2017-08-19T11:00:00.000Z",
"key": 1503140400000,
"doc_count": 60,
"sum_trx": {
"value": 238703
}
},
{
"key_as_string": "2017-08-19T12:00:00.000Z",
"key": 1503144000000,
"doc_count": 60,
"sum_trx": {
"value": 248056
}
},
{
"key_as_string": "2017-08-19T13:00:00.000Z",
"key": 1503147600000,
"doc_count": 60,
"sum_trx": {
"value": 247916
}
},
{
"key_as_string": "2017-08-19T14:00:00.000Z",
"key": 1503151200000,
"doc_count": 60,
"sum_trx": {
"value": 216478
}
},
{
"key_as_string": "2017-08-19T15:00:00.000Z",
"key": 1503154800000,
"doc_count": 60,
"sum_trx": {
"value": 160784
}
},
{
"key_as_string": "2017-08-19T16:00:00.000Z",
"key": 1503158400000,
"doc_count": 60,
"sum_trx": {
"value": 107450
}
},
{
"key_as_string": "2017-08-19T17:00:00.000Z",
"key": 1503162000000,
"doc_count": 60,
"sum_trx": {
"value": 86520
}
},
{
"key_as_string": "2017-08-19T18:00:00.000Z",
"key": 1503165600000,
"doc_count": 60,
"sum_trx": {
"value": 68501
}
},
{
"key_as_string": "2017-08-19T19:00:00.000Z",
"key": 1503169200000,
"doc_count": 60,
"sum_trx": {
"value": 55975
}
},
{
"key_as_string": "2017-08-19T20:00:00.000Z",
"key": 1503172800000,
"doc_count": 60,
"sum_trx": {
"value": 40971
}
},
{
"key_as_string": "2017-08-19T21:00:00.000Z",
"key": 1503176400000,
"doc_count": 60,
"sum_trx": {
"value": 27974
}
},
{
"key_as_string": "2017-08-19T22:00:00.000Z",
"key": 1503180000000,
"doc_count": 60,
"sum_trx": {
"value": 18237
}
},
{
"key_as_string": "2017-08-19T23:00:00.000Z",
"key": 1503183600000,
"doc_count": 60,
"sum_trx": {
"value": 13241
}
}
]
}
}
}
Source document
If we look into the source document, which is parsed by a pipeline, we have a lot of information which aren’t related to the statistical information. We can not only save documents, but also clean up the number of fields.
{
"_index": "fraud-2017.10.20",
"_type": "stats",
"_id": "AV84tDR8IIOyJsb0pJ3m",
"_score": 1,
"_source": {
"instance": 0,
"offset": 1050043,
"level": "I",
"logger": "STA9101",
"channel": "Issuing",
"input_type": "log",
"logmessage": "2290 transactions since 2017-10-20 09:33:10, next statistical log at: 2017-10-20 09:35:10",
"index": "fraud",
"source": "/var/log/RiskShield/iss/prd/cur/2017-10-20_DecisionServer_Stats.log",
"type": "stats",
"tags": [
"beats_input_codec_plain_applied"
],
"environment": "prd",
"@timestamp": "2017-10-20T07:34:10.000Z",
"application": "RiskShield",
"@version": "1",
"beat": {
"hostname": "fraud-detect",
"name": "fraud-detect",
"version": "5.5.2"
},
"host": "fraud-detect",
"value": 2290
},
"fields": {
"@timestamp": [
1508484850000
]
}
}
Using Watcher
One way for automation, is Elasticsearch Watcher. If you don’t have a commercial license, you could also easily accomplish this task with Spring Batch and a custom Tasklet Implementation using the official Elasticsearch Java Rest Client libraries.
For demonstration purpose, following Index and Mapping will be used:
DELETE test
PUT test
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"stats": {
"_all": {
"enabled": false
},
"properties": {
"@timestamp": {
"type": "date"
},
"channel": {
"type": "keyword"
},
"logger": {
"type": "keyword"
},
"value": {
"type": "integer"
},
"ingest.agent": {
"type": "keyword"
},
"ingest.time": {
"type": "date"
}
}
}
}
}
Now the tricky part. As index source I choose a specific index for testing. Use an alias in production instead. The action part is more interesting. The search aggregation results will be used as index payload to write a new document.
Define watch for testing purpose on specific index
PUT /_xpack/watcher/watch/fraud-issuing-aggregations
{
"input": {
"search": {
"request": {
"indices": [
"fraud-2017.08.20"
],
"types": [
"stats"
],
"body": {
"size": 0,
"query": {
"bool": {
"must": [
{
"match": {
"channel.keyword": "Issuing"
}
},
{
"match": {
"logger.keyword": "STA9101"
}
}
]
}
},
"aggs": {
"trx_over_time": {
"date_histogram": {
"field": "@timestamp",
"interval": "1h"
},
"aggs": {
"sum_trx": {
"sum": {
"field": "value"
}
}
}
}
}
}
}
}
},
"trigger": {
"schedule": {
"interval": "1d"
}
},
"actions": {
"index_payload": {
"transform": {
"script": {
"lang": "painless",
"source": """
def docs = [];
def id = '';
def value = 0;
for(item in ctx.payload.aggregations.trx_over_time.buckets) {
def document = [
'_id': item.key,
'@timestamp': LocalDateTime.ofInstant(Instant.ofEpochMilli(item.key), ZoneOffset.UTC).atZone(ZoneId.of("Europe/Zurich")).toInstant().toEpochMilli(),
'value': item.sum_trx.value,
'logger': 'STA9101',
'channel': 'Issuing',
'ingest.time': ctx.execution_time,
'ingest.agent': 'watcher'
];
docs.add(document);}
return ['_doc' : docs];
"""
}
},
"index": {
"index": "test",
"doc_type": "stats"
}
}
}
}
Execute watch manually
POST _xpack/watcher/watch/fraud-issuing-aggregations/_execute
Query aggregated documents
GET test/_search
{
"query": {"match_all": {}}
}
If everything works perfectly, we can adjust the watcher to do it for indices older than two weeks and remove the old indices. Therefore Elasticsearch Curator comes in handy, by assigning indices older than two weeks to a dedicated alias.