查询引擎之二

这一部分承接上一部分,主要介绍Eleasticsearch Stack
本篇是本系列第四篇,也是最后一篇

Elasticsearch

前一段时间,在接触spring-boot时有过接触,当时的博客
这里先上一张图:
elasticsearch数据结构.png

再看看ES的术语:

关系型数据库 Elasticsearch
Table Index(Type)
Row Document
Column Filed
Schema Mapping
SQL DSL

Elasticsearch有3个标签:Store, Search, and Analyze,前边的博客介绍了Store与Search,这里主要介绍Analyze。
官网参考

ES前期一个Index可以有多个type,现在已经移除,一个Index只能有一个type了
ES默认是动态创建索引和索引类型的mapping的。这就相当于无需定义Solr中的Schema,无需指定各个field的索引规则就可以索引文件,很方便。但有时方便就代表着不灵活。比如,ES默认一个field是要做分词的,但我们有时要搜索匹配整个field却不行。如有统计工作要记录每个城市出现的次数。对于NAMEfield,若记录「new york」文本,ES可能会把它拆分成「new」和「york」这两个词,分别计算这个两个单词的次数,而不是我们期望的「new york 」。这时,就需要我们在创建索引时定义mapping。mapping文件如下:出处

1
2
3
4
5
6
7
8
9
10
{ "index_type":
{ "properties":
{
"ID":{
"type":"string", "index":"not_analyzed" },
"NAME":{ "
type":"string", "fields":{ "NAME":{ "type":"string" }, "raw":{ "type":"string", "index":"not_analyzed" } } }
}
}
}

聚合简介

ES提供了4种类型的聚合,包括:Bucekting、Metric、Matrix、Pipeline,主要是前2个
Metrix: 在一组document之上的计算指标,max、min、sum、avg等
Bucketing: 生成Bucketing的一组聚合,其中每个Bucketing都与一个键和一个Document的条件相关联。执行汇总时,将对上下文中的每个document评估所有bucketing条件,并且当条件匹配时,该文档将被视为“落入”相关buckeing内。对应SQL的groupBy
Matrix:一类聚合,可在多个字段上进行操作,并根据从请求的文档字段中提取的值生成矩阵结果。ES对它的支持还不完善,还不支持脚本
Pipeline: 汇总其他汇总及其相关指标的输出的汇总

聚合的结构:

1
2
3
4
5
6
7
8
9
10
"aggregations" : {
"<aggregation_name>" : {
"<aggregation_type>" : {
<aggregation_body>
}
[,"meta" : { [<meta_data_body>] } ]?
[,"aggregations" : { [<sub_aggregation>]+ } ]?
}
[,"<aggregation_name_2>" : { ... } ]*
}

说明:aggregations可以写成aggs

Metrix

Avg

比如一个学生分数的document,计算平均分:

1
2
3
4
5
6
POST /exams/_search?size=0
{
"aggs" : {
"avg_grade" : { "avg" : { "field" : "grade" } }
}
}

这里的aggregation_name是”avg_grade”,自定义的。aggregation_type是”avg”,类似于算子,ES提供的,filed指明avg在哪个field上执行,算子的参数。结果:

1
2
3
4
5
6
7
8
{
...
"aggregations": {
"avg_grade": {
"value": 75.0
}
}
}

avg可以基于脚本来编写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
POST /exams/_search?size=0
{
"aggs" : {
"avg_grade" : {
"avg" : {
"script" : {
"id": "my_script",
"params": {
"field": "grade"
}
}
}
}
}
}

这会将脚本参数解释为具有painless语言且没有脚本参数的嵌入式脚本。更进一步可以写为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
POST /exams/_search?size=0
{
"aggs" : {
"avg_corrected_grade" : {
"avg" : {
"field" : "grade",
"script" : {
"lang": "painless",
"source": "_value * params.correction",
"params" : {
"correction" : 1.2
},
"missing": 0
}
}
}
}
}

在原有的分数上做一个修正。missing参数指定缺省值。
与Avg类似的还有个加权平均:Weighted Avg,比Avg增加了weight参数,这个不多介绍了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
POST /exams/_search
{
"size": 0,
"aggs" : {
"weighted_grade": {
"weighted_avg": {
"value": {
"field": "grade"
},
"weight": {
"field": "weight"
}
}
}
}
}

extended_stats

cardinality等价于distinct,查询某个field不同值的个数,max、min、sum都类似于sql,不多介绍,简单介绍一个stats与extended_stats。stats是将某个field的count、min、max、avg、sum都计算出来,extended_stats比stats增加了平方和、方差、标准差等值

1
2
3
4
5
6
7
GET /exams/_search
{
"size": 0,
"aggs" : {
"grades_stats" : { "extended_stats" : { "field" : "grade" } }
}
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
...

"aggregations": {
"grades_stats": {
"count": 2,
"min": 50.0,
"max": 100.0,
"avg": 75.0,
"sum": 150.0,
"sum_of_squares": 12500.0,
"variance": 625.0,
"std_deviation": 25.0,
"std_deviation_bounds": {
"upper": 125.0,
"lower": 25.0
}
}
}
}

这一块用的是Get,有点疑惑,另外std_deviation_bounds这个值是:与测量值的正负两个标准差的间隔,正态分布的话,应该是95.4%的区间

percentiles

还有一个比较有趣的是percentiles,这个给出的是相应概率发生的位置,比如对加响应时间的分析:

1
2
3
4
5
6
7
8
9
10
11
GET latency/_search
{
"size": 0,
"aggs" : {
"load_time_outlier" : {
"percentiles" : {
"field" : "load_time"
}
}
}
}

结果是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
...

"aggregations": {
"load_time_outlier": {
"values" : {
"1.0": 5.0,
"5.0": 25.0,
"25.0": 165.0,
"50.0": 445.0,
"75.0": 725.0,
"95.0": 945.0,
"99.0": 985.0
}
}
}
}

Bucket

filter

介绍一些filter,类似于where的用法,比如计算t-shirt的平均价格:

1
2
3
4
5
6
7
8
9
10
11
POST /sales/_search?size=0
{
"aggs" : {
"t_shirts" : {
"filter" : { "term": { "type": "t-shirt" } },
"aggs" : {
"avg_price" : { "avg" : { "field" : "price" } }
}
}
}
}

结果:

1
2
3
4
5
6
7
8
9
{
...
"aggregations" : {
"t_shirts" : {
"doc_count" : 3,
"avg_price" : { "value" : 128.33333333333334 }
}
}
}

还有一种写法

1
2
3
4
5
6
7
8
9
10
11
12
13
POST /sales/_search?size=0
{
"query" : {
"constant_score" : {
"filter" : {
"match" : { "type" : "hat" }
}
}
},
"aggs" : {
"hat_prices" : { "sum" : { "field" : "price" } }
}
}

term

term有点类似与group by,分组统计个数,例如,汇总某种类型(genre)的document数量

1
2
3
4
5
6
7
8
GET /_search
{
"aggs" : {
"genres" : {
"terms" : { "field" : "genre" }
}
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
...
"aggregations" : {
"genres" : {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets" : [
{
"key" : "electronic",
"doc_count" : 6
},
{
"key" : "rock",
"doc_count" : 3
},
{
"key" : "jazz",
"doc_count" : 2
}
]
}
}
}

官方对这里有个说明:terms聚合应该是关键字类型的字段或适用于bucket聚合的任何其他数据类型。为了与text一起使用,您将需要启用fielddata。

默认情况下,terms汇总将返回doc_count排序的前十的 数据,可以通过设置size参数来更改此默认行为。另外还有一个
shard_size来对size过大时出现的性能问题进行调优。如果要分页来获取,需要使用Composite,这个后边介绍。

查询时可以通过order来对结果进行排序,如下:

1
2
3
4
5
6
7
8
9
10
11
GET /_search
{
"aggs" : {
"genres" : {
"terms" : {
"field" : "genre",
"order" : { "_count" : "asc" }
}
}
}
}

分组最大并排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET /_search
{
"aggs" : {
"genres" : {
"terms" : {
"field" : "genre",
"order" : { "max_play_count" : "desc" }
},
"aggs" : {
"max_play_count" : { "max" : { "field" : "play_count" } }
}
}
}
}

在嵌套的情况下,有时会应用聚合的结果,这时通过”>”来指明层级,而通过”.”来引用某个metric

histogram

1
2
3
4
5
6
7
8
9
10
11
12
POST /sales/_search?size=0
{
"aggs" : {
"prices" : {
"histogram" : {
"field" : "price",
"interval" : 50,
"min_doc_count" : 1
}
}
}
}

interval来指明间隔、min_doc_count指明最少的个数,如果小于该值,则过滤掉该区间
结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
...
"aggregations": {
"prices" : {
"buckets": [
{
"key": 0.0,
"doc_count": 1
},
{
"key": 50.0,
"doc_count": 1
},
{
"key": 150.0,
"doc_count": 2
},
{
"key": 200.0,
"doc_count": 3
}
]
}
}
}

date_histogram

最常用的按时间汇总,比如按月统计

1
2
3
4
5
6
7
8
9
10
11
12
POST /sales/_search?size=0
{
"aggs" : {
"sales_over_time" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month",
"format" : "yyyy-MM-dd"
}
}
}
}

这里的calendar_interval只能是单位时间,如:1d、1M等,key是时间戳,如果要显示文本,可以增加format,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
...
"aggregations": {
"sales_over_time": {
"buckets": [
{
"key_as_string": "2015-01-01",
"key": 1420070400000,
"doc_count": 3
},
{
"key_as_string": "2015-02-01",
"key": 1422748800000,
"doc_count": 2
},
{
"key_as_string": "2015-03-01",
"key": 1425168000000,
"doc_count": 2
}
]
}
}
}

如果是想自定义时间区间,可以用date_range来查询,这里不多说了。

composite

  • source
    source用来指明构建composite bucket的源,source的顺序很重要,它控制返回key的顺序。
    composite有3种source: term、histogram、Date histogram。可以但source,但感觉单source跟原意差别不大。下例组合date_histogram与 term,感觉像是两种source的笛卡尔积,这个得试一下,官网没给出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    GET /_search
    {
    "size": 0,
    "aggs" : {
    "my_buckets": {
    "composite" : {
    "sources" : [
    { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
    { "product": { "terms": {"field": "product" } } }
    ]
    }
    }
    }
    }
  • pagination
    通过size与after来制定分页

    第一次查询:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    GET /_search
    {
    "size": 0,
    "aggs" : {
    "my_buckets": {
    "composite" : {
    "size": 2,
    "sources" : [
    { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d" } } },
    { "product": { "terms": {"field": "product" } } }
    ]
    }
    }
    }
    }

返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
...
"aggregations": {
"my_buckets": {
"after_key": {
"date": 1494288000000,
"product": "mad max"
},
"buckets": [
{
"key": {
"date": 1494201600000,
"product": "rocky"
},
"doc_count": 1
},
{
"key": {
"date": 1494288000000,
"product": "mad max"
},
"doc_count": 2
}
]
}
}
}

第二次查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET /_search
{
"size": 0,
"aggs" : {
"my_buckets": {
"composite" : {
"size": 2,
"sources" : [
{ "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } },
{ "product": { "terms": {"field": "product", "order": "asc" } } }
],
"after": { "date": 1494288000000, "product": "mad max" }
}
}
}
}

top_hits

在官网中,top_hits当成metric aggregator,但我觉得它主要与bucket连用。它追踪之前命中(hit)的document,这样可以在子聚合上对这些document进行某些操作。
top_hits可以有效的通过bucket aggregator来对某些指定field进行分组、排序等

它有几个参数:

  • from: 偏移量
  • size: 与其他size一样,指定个数
  • sort: 对命中的document进行排序
  • _source: 指定字段,如果不指定,则全部返回

比如通过某个字段来分桶,然后再通过metics进行聚合,并且想返回多个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
POST /my_index/_search
{
"size": 0,
"aggs" : {
"metaQunatity" : {
"terms" : {
"field" : "metaQuantityId"
},
"aggs" : {
"quantitySumPrice" : { "sum" : { "field" : "quantityTotalPrice" } },
"materialSumPrice" : { "sum" : { "field" : "materiaTotalPrice" } },
"aggs": {
"top_hits": {
"size": 1,
"_source":{
"includes": ["metaQuantityId", "metaQuantityName"]
}

}
}

}
}
}
}

Pipeline

这个只介绍一个:sum_bucket,可以对兄弟聚合的结果进行再次聚合

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"sum_monthly_sales": {
"sum_bucket": {
"buckets_path": "sales_per_month>sales"
}
}
}
}

sum_bucket与buckets_path是语法,sales_per_month>sales是致命对兄弟聚合的哪个field进行聚合,结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
}
}
]
},
"sum_monthly_sales": {
"value": 985.0
}
}
}

这个例子也给出一般的写法,用bucket来分组,用metrix来计算,最后可以通过pipeline对计算结果再次汇总。

Mapping

追加一节的Mapping

  • 简介
    首先Mapping是什么?前边说过Mapping等价于关系型数据库的Schema,它保存着一个document有哪些field,以及field的类型信息。用mapping可以定义:
    哪些string field应该用于全文检索
    哪些field包含number、date或者地理坐标
    date值的formate
    自定义规则来控制mapping动态增加field

    每个field都有一个数据type,可以是:
    一个简单的type,如:text、keyword、date、long、double、boolean 或者ip
    json类型,用object或者nested
    特殊类型: geo_point、geo_shape、completion

    一个string field可以通过fields参数来指定即是text又是keyword

    es在创建index可以动态的创建mapping(Dynamic mapping),当然也支持用户显示的定义mapping( Explicit mapping).显示创建index,如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    PUT http://localhost:9200/user
    {
    "mappings": {
    "properties": {
    "age": { "type": "integer" },
    "email": { "type": "keyword" },
    "name": { "type": "text" }
    }
    }
    }

    在已有index上增加一个已经存在的mapping:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    PUT http://localhost:9200/user/_mapping
    {
    "properties": {
    "employee-id": {
    "type": "keyword",
    "index": false
    }
    }
    }

    注意:除了修改field的参数,不能改变一个index的mapping或者field type。如果需要修改,需要重新创建一个index,然后通过reindex数据。 这个reindex的说明是:Copies documents from one index to another。

    1
    2
    3
    4
    5
    6
    7
    8
    {
    "source": {
    "index": "old_index"
    },
    "dest": {
    "index": "new_index"
    }
    }

    另外也不能修改一个field的名,但可以通过alias来起个别名。

    ps: 这一部分像极了create table

    下边介绍几个类型,主要是:text、keyword、nested、object以及reindex

  • text
    这个字段用来创建全文索引,如一封email的内容或者一个产品的描述。这个字段的内容会传递个analyzer进行分词,然后创建索引,这样就可以通过某个关键字进行搜索。Text字段不能用于sort以及极少极少用于aggregation。 例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    PUT my_index
    {
    "mappings": {
    "properties": {
    "full_name": {
    "type": "text"
    }
    }
    }
    }

    前边说了极少极少,也就说其实是可以的,如果想进行聚合,那就需要用fielddata参数设成true。这个参数默认是false的,因为在第一使用它是,它是通过从磁盘读取每个段的整个反向索引而构建的,通过反转document与term的关系来构建通过document查询term的功能,并将结果存储在内存中。这个绝对是个无底洞,所以也就是极少极少啦。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    PUT my_index/_mapping
    {
    "properties": {
    "my_field": {
    "type": "text",
    "fielddata": true
    }
    }
    }

    text可以通过fields字段来指定兼容keyword。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    {
    "my_index": {
    "mappings": {
    "properties": {
    "group": {
    "type": "text",
    "fields": {
    "keyword": {
    "type": "keyword",
    "ignore_above": 256
    }
    }
    }
    }
    }
    }
    }
  • keyword
    这个字段用来索引结构化的数据,如id、email的地址、hostname、status code。这与text是两个问题域。text是通过分词,来用term进行搜索document。keyword则是通过document来查询term进行filter、sorting、aggregation等。例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    PUT my_index
    {
    "mappings": {
    "properties": {
    "tags": {
    "type": "keyword"
    }
    }
    }
    }

    keyword有个默认开启的属性:doc_value,这个属性在很多的field上都有。它就是。doc_values是在document index时建立的磁盘上数据结构,这使通过document查询term这种数据访问模式成为可能。默认开启就能通过手动关闭,这里不做介绍了。默认的就是最好的。

  • object与nested

    json文档天热带有层级结构,如此就有object。如图:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
      PUT my_index/_doc/1
    {
    "region": "US",
    "manager": {
    "age": 30,
    "name": {
    "first": "John",
    "last": "Smith"
    }
    }
    }

    manager本身是一个对象,它下边还有一个name对象,在内部document会将manager对象拍平来存储,比如:

    1
    2
    3
    4
    5
    6
    {
    "region": "US",
    "manager.age": 30,
    "manager.name.first": "John",
    "manager.name.last": "Smith"
    }

    它的mapping如下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    PUT my_index
    {
    "mappings": {
    "properties": {
    "region": {
    "type": "keyword"
    },
    "manager": {
    "properties": {
    "age": { "type": "integer" },
    "name": {
    "properties": {
    "first": { "type": "text" },
    "last": { "type": "text" }
    }
    }
    }
    }
    }
    }
    }

    这里要多说一句,ES中并没有专门的array类型,任何field都可以包括多个值,当然这些值必须是相同的数据类型。但在存储 Array时,会有一些变化,如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    PUT my_index/_doc/1
    {
    "group" : "fans",
    "user" : [
    {
    "first" : "John",
    "last" : "Smith"
    },
    {
    "first" : "Alice",
    "last" : "White"
    }
    ]
    }

    它在es中将会转换成如下结构:

    1
    2
    3
    4
    5
    {
    "group" : "fans",
    "user.first" : [ "alice", "john" ],
    "user.last" : [ "smith", "white" ]
    }

    这种存储,使得Alice与White的关系不存在了,如果进行查询:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    GET my_index/_search
    {
    "query": {
    "bool": {
    "must": [
    { "match": { "user.first": "Alice" }},
    { "match": { "user.last": "Smith" }}
    ]
    }
    }
    }

    数据就会查找出来
    如何解决这个问题呢,就需要nested,通过

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    PUT my_index
    {
    "mappings": {
    "properties": {
    "user": {
    "type": "nested"
    }
    }
    }
    }

    将user属性增加nested,来指明,这样它们的关系就存在了,查询就无法查询出来了

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    GET my_index/_search
    {
    "query": {
    "nested": {
    "path": "user",
    "query": {
    "bool": {
    "must": [
    { "match": { "user.first": "Alice" }},
    { "match": { "user.last": "White" }}
    ]
    }
    }
    }
    }
    }

    这时候,从mapping中看,user的类型就是nested.

Logstash

Logstash只做简单的介绍
Logstash主要用于收集数据,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。

官网简介参考

logstash结构示意图.png
Logstash事件处理管道包括3个阶段:inputs 、filters、outputs。

  • inputs
    input产生事件,源可以包括:file、syslog、redis、beats

  • filters
    filter转换数据,包括:grok结构化数据;对数据进行修改、替换、删除数据,比如修改日期格式;geoip对ip增加地理信息等

  • outputs
    输出,一般就输出到es中

kibana

官网
kibana不介绍了,官网就比较清楚啦,从初步了解上看,它做了2个事情:将页面的配置转换成ES的查询;将查询的结果用图形显示出来。