当前位置:网站首页>日志监控ELK专题(1)

日志监控ELK专题(1)

2020-11-13 13:16:30 osc_mox9hom9

任务背景

运维人员需要对系统和业务日志进行精准把控,便于分析系统和业务状态。日志分布在不同的服务器上,传统的使用
传统的方法依次登录每台服务器查看日志,既繁琐又效率低下。所以我们需要集中化的日志管理工具将位于不同服务
器上的日志收集到一起, 然后进行分析,展示

前面我们学习过rsyslog,它就可以实现集中化的日志管理,可是rsyslog集中后的日志实现统计与检索又成了一个问
题。使用wc, grep,
awk等相关命令可以实现统计与检索,但如果要求更高的场景,这些命令也会力不从心。所以我们需要一套专业的日志收集分析展示系统。

mark

总结:

1、日志是用于记录系统或业务的状态

2、通过日志可以获得系统或业务的状态,并进行分析。

3、早期的日志是分散在各主机上

4、通过rsyslog实现本地日志管理,收集,轮转,集中管理
5、早期的日志分析方法:wc,grep,awk

6、集中式的日志收集、分析、展示系统

任务要求

1, 搭建ELK集群

2, 收集日志信息并展示

任务拆解

1, 认识ELK

2, 部署elasticsearch集群并了解其基本概念

3, 安装elasticsearch-head实现图形化操作

4, 安装logstash收集日志

5, 安装kibana日志展示

6, 安装file beat实现轻量级日志收集

学习目标

能够说出ELK的应用场景

能够区分ELK架构中elasticsearch,logstash,kibina三个软件各自的主要功能能够单机部署elasticsearch

能够部署elasticsearch集群理解ELK中索引的概念

能够部署logstash

能够使用logstash做日志采集

认识ELK

ELK是一套开源的日志分析系统,由elasticsearch+logstash+Kibana组成。官网说明:<https://www.elastic.co/cn/products>;

首先: 先一句话简单了解E,L,K这三个软件

elasticsearch: 分布式搜索引擎

logstash: 日志收集与过滤,输出给elasticsearch Kibana: 图形化展示

mark

elk下载地址:<https://www.elastic.co/cn/downloads>;

mark

环境准备:

mark

四台机器(内存建议大于1G,比如1.5G; filebeat服务器可为1G) :
1,静态IP(要求能上公网,最好用虚拟机的NAT网络类型上网)

2,主机名及主机名绑定

kibana elasticsearch logstash

filebeat

  1. 10.1.1.11 vm1.cluster.com

  2. 10.1.1.12 vm2.cluster.com

  3. 10.1.1.13 vm3.cluster.com

  4. 10.1.1.14 vm4.cluster.com

3, 关闭防火墙和selinux

# systemctl stop firewalld

  1. # systemctl disable firewalld

  2. # iptables -F

  3. # setenforce 0

  4. setenforce: SELinux is disabled

4, 时间同步

# systemctl restart ntpd

  1. # systemctl enable ntpd

5, yum源(centos安装完系统后的默认yum源就OK)

elasticsearch

elasticsearch简介

Elasticsearch(简称ES)是一个开源的分布式搜索引擎,Elasticsearch还是一个分布式文档数据库。所以它提供了大量数
据的存储功能,快速的搜索分析功能。

提到搜索,大家肯定就想到了百度,谷歌,必应等。当然也有如下的搜索场景。

mark

elasticsearch部署

第1步:
在elasticsearch服务器上(我这里为vm2),确认jdk(使用系统自带的openjdk就OK)

1 [root\@vm2 \~]# rpm -qa |grep openjdk

2 java-1.8.0-openjdk-headless-1.8.0.161-2.b14.el7.x86_64

3 java-1.8.0-openjdk-1.8.0.161-2.b14.el7.x86_64

  1. [root\@vm2 \~]# java -version

  2. openjdk version "1.8.0_161"

  3. OpenJDK Runtime Environment (build 1.8.0_161-b14)

  4. OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)

第2步: es的安装,配置

[root\@vm2 \~]# wget
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-
6.5.2.rpm

  1. [root\@vm2 \~]# rpm -ivh elasticsearch-6.5.2.rpm

第3步: 单机es的配置与服务启动

9200则是数据传输端口

  1. 9300端口是集群通信端口(我们暂时还没有配置集群,现在是单点elasticsearch)

LISTEN

:::*

0 :::9300


0

14

LISTEN

:::*

0 :::9200

0

tcp6 5329/java

tcp6

5329/java

启动有点慢和卡,稍等1分钟左右,查看到以下端口则表示启动OK

[root\@vm2 \~]# netstat -ntlup |grep java

[root\@vm2 \~]# systemctl start elasticsearch

[root\@vm2 \~]# systemctl enable elasticsearch

打开注释,并修改为监听所有打开注释,监听端口9200

可以自定义一个集群名称,不配置的话默认会取名为elasticsearch

cluster.name: elk-cluster path.data: /var/lib/elasticsearch path.logs:
/var/log/elasticsearch network.host: 0.0.0.0

http.port: 9200

[root\@vm2 \~]# cat /etc/elasticsearch/elasticsearch.yml |grep -v "#"

第4步: 查看状态

使用curl命令或浏览器访问http://10.1.1.12:9200/_cluster/health?pretty地址(IP为ES服务器IP)

1 [root\@vm2 \~]# curl http://10.1.1.12:9200/_cluster/health?pretty

mark

elasticsearch集群部署

集群部署主要注意以下几个方面

  1. 集群配置参数:

    discovery.zen.ping.unicast.hosts,Elasticsearch默认使用Zen
    Discovery来做节点发现机制,推荐使用unicast 来 做 通 信 方 式 , 在 该 配 置
    项 中 列 举 出 Master 节 点 。
    discovery.zen.minimum_master_nodes,该参数表示集群中可工作的具有Master节点资格的最小数量,
    默认值是1。为了提高集群的可用性,避免脑裂现象。官方推荐设置为(N/2)+1,其中N是具有Master资格
    的节点的数量。




    discovery.zen.ping_timeout,表示节点在发现过程中的等待时间,默认值是3秒,可以根据自身网络环境
    进行调整,一定程度上提供可用性。

  2. 集群节点:

    节点类型主要包括Master节点和data节点(client节点和ingest节点不讨论)。通过设置两个配置项

    node.master和node.data为true或false来决定将一个节点分配为什么类型的节点。
    尽量将Master节点和Data节点分开,通常Data节点负载较重,需要考虑单独部署。

  3. 内存:

Elasticsearch默认设置的内存是1GB,对于任何一个业务部署来说,这个都太小了。通过指定ES_HEAP_SIZE环境变量,可以修改其堆内存大小,服务进程在启动时候会读取这个变量,并相应的设置堆
的大小。建议设置系统内存的一半给Elasticsearch,但是不要超过32GB。

  1. 硬盘空间:

    Elasticsearch默认将数据存储在/var/lib/elasticsearch路径下,随着数据的增长,一定会出现硬盘空间不
    够用的情形,大环境建议把分布式存储挂载到/var/lib/elasticsearch目录下以方便扩容。

    配置参考文档:
    <https://www.elastic.co/guide/en/elasticsearch/reference/index.html>;

可以使用两台或两台以上ES做集群, 以下就是两台ES做集群的配置

集群所有节点IP

discovery.zen.ping.unicast.hosts: ["10.1.1.11", "10.1.1.12"]

本机IP或主机名

指定不为master节点

node.name: 10.1.1.11 node.master: false

path.data: /var/lib/elasticsearch path.logs: /var/log/elasticsearch
network.host: 0.0.0.0

http.port: 9200

[root\@vm1 \~]# cat /etc/elasticsearch/elasticsearch.yml |grep -v "#"

cluster.name: elk-cluster

集群所有节点IP

discovery.zen.ping.unicast.hosts: ["10.1.1.11", "10.1.1.12"]

本机IP或主机名

指定为master节点

node.name: 10.1.1.12 node.master: true

path.data: /var/lib/elasticsearch path.logs: /var/log/elasticsearch
network.host: 0.0.0.0

http.port: 9200

[root\@vm2 \~]# cat /etc/elasticsearch/elasticsearch.yml |grep -v "#"

cluster.name: elk-cluster

启动或重启服务

[root\@vm1 \~]# systemctl restart elasticsearch

  1. [root\@vm1 \~]# systemctl enable elasticsearch

  2. [root\@vm2 \~]# systemctl restart elasticsearch

查看状态

mark

elasticsearch基础概念

主要的基础概念有:Node, Index,Type,Document,Field,shard和replicas.

Node(节点):运行单个ES实例的服务器Cluster(集群):一个或多个节点构成集群Index(索引):索引是多个文档的集合

Document(文档):Index里每条记录称为Document,若干文档构建一个Index
Type(类型):一个Index可以定义一种或多种类型,将Document逻辑分组Field(字段):ES存储的最小单元

Shards(分片):ES将Index分为若干份,每一份就是一个分片Replicas(副本):Index的一份或多份副本

为了便于理解,我们和mysql这种关系型数据库做一个对比:

关系型数据库(如mysql,oracle等) elasticsearch
database index
table type
row document
column field

ES是分布式搜索引擎,每个索引有一个或多个分片(shard),索引的数据被分配到各个分片上。你可以看作是一份数
据分成了多份给不同的节点。

当ES集群增加或删除节点时,shard会在多个节点中均衡分配。默认是5个primary
shard(主分片)和1个replica shard(副本,用于容错)。

elaticsearch基础API操作

前面我们通过http://10.1.1.12:9200/_cluster/health?pretty查看ES集群状态,其实就是它的一种API操作。
什么是API?

API(Application Programming
Interface)应用程序编程接口,就是无需访问程序源码或理解内部工作机制就能实现一些相关功能的接口。

RestFul API 格式

1 curl -X\<verb\> ‘\<protocol\>://\<host\>:\<port\>/\<path\>?\<query_string\>’-d
‘\<body\>’

参数 描述
verb HTTP方法,比如GET、POST、PUT、HEAD、DELETE
host ES集群中的任意节点主机名
port ES HTTP服务端口,默认9200
path 索引路径
query_string 可选的查询请求参数。例如?pretty参数将返回JSON格式数据
-d 里面放一个GET的JSON格式请求主体
body 自己写的 JSON格式的请求主体

elasticseearch的API很多, 我们运维人员主要用到以下几个要介绍的较简单的API。

更多API参考:
<https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html>;

查看节点信息**

通过curl或浏览器访问http://10.1.1.12:9200/_cat/nodes?v(ip为ES节点IP,如果有ES集群,则为ES任意节点IP)

*

0.33 mdi

0.37

0.24

92 0

26

10.1.1.12

10.1.1.11

10.1.1.11

4

-

0.85 mdi

1.88

2.33

2

94

29

heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name

ip

10.1.1.12

[root\@vm2 \~]# curl http://10.1.1.12:9200/_cat/nodes?v

1

2

3

查看索引信息

通过curl或浏览器访问http://10.1.1.12:9200/_cat/indices?v

1 [root\@vm2 \~]# curl http://10.1.1.12:9200/_cat/indices?v

  1. health status index uuid pri rep docs.count docs.deleted store.size
    pri.store.size

  2. 默认现在没有任何索引

新增索引

6 green open nginx_access_log 90Z7DvInTz6seXMBYhHVAw 5 1 0 0

2.2kb 1.1kb

7 460b

pri rep docs.count docs.deleted

uuid

health status index

store.size pri.store.size

[root\@vm2 \~]# curl http://10.1.1.12:9200/_cat/indices?v

[root\@vm2 \~]# curl -X PUT http://10.1.1.12:9200/nginx_access_log

{"acknowledged":true,"shards_acknowledged":true,"index":"nginx_access_log"}

green:表示每个index的shard和replica都是活跃状态的。

yellow:表示每个index的shard是活跃状态的,replica是不可用状态的。red:表示索引中有些shard是不可用状态,导致数据丢失。

重新说明

green:所有的主分片和副本分片都已分配。你的集群是 100% 可用的。

yellow:所有的主分片已经分片了,但至少还有一个副本是缺失的。不会有数据丢失,所以搜索结果依然是完整的。
不过,你的高可用性在某种程度上被弱化。如果 更多的
分片消失,你就会丢数据了。把 yellow 想象成一个需要及时调查的警告。

red:至少一个主分片(以及它的全部副本)都在缺失中。这意味着你在缺少数据:搜索只能返回部分数据,而分配
到这个分片上的写入请求会返回一个异常。

删除索引

[root\@vm2 \~]# curl -X DELETE http://10.1.1.12:9200/nginx_access_log

  1. {"acknowledged":true}

查询

ES提供一种可用于执行查询JSON式的语言,被称为Query
DSL。针对elasticsearch的操作,可以分为增、删、改、查四个动作。

查询匹配条件:

match_all from,size match bool range

查询应用案例:

把以下localhost修改为es IP地址。导入数据源

使用官方提供的示例数据:

  1. #下载

    1. [root\@vm2 \~]# wget
      https://raw.githubusercontent.com/elastic/elasticsearch/master/docs/src/test/resources

      /accounts.json

3

  1. #导入至elasticsearch

  2. [root\@vm2 \~]# curl -H "Content-Type: application/json" -XPOST
    "localhost:9200/bank/_doc/_bulk?pretty&refresh" --data-binary
    "\@accounts.json"

    6

  3. #查询所有索引

  4. [root\@vm2 \~]# curl "localhost:9200/_cat/indices?v"

  5. health status index uuid pri rep docs.count docs.deleted store.size
    pri.store.size

  6. green open bank CzFQ_Gu1Qr2-bpV5MF0OBg 1 1 1000 0

    874.7kb 434.4kb

11

  1. #查询bank索引下数据(使用查询字符串进行查询)

  2. [root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?
    q=*&sort=account_number:asc&pretty"

    14

  3. #_search 属于一类API,用于执行查询操作

  4. #q=* ES批量索引中的所有文档

  5. #sort=account_number:asc 表示根据account_number按升序对结果排序

18

19

20

21

22 #查询bank索引下数据 (使用json格式进行查询) 23

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search" -H 'Content-Type:
application/json' -d'

{

"query": { "match_all": {} }, "sort": [

{ "account_number": "asc" }

]

}

'

#此为单引号

查询匹配动作及案例:

match_all

匹配所有文档。默认查询

示例:查询所有,默认只返回10个文档

#query告诉我们查询什么#match_all使我们查询的类型

#match_all查询仅仅在指定的索引的所有文件进行搜索

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
'Content-Type: application/json' -d'

{

"query": { "match_all": {} }

}

'

from,size

除了query参数外,还可以传递其他参数影响查询结果,比如前面提到的sort,接下来使用的size

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search" -H 'Content-Type:
application/json' -d'

{

"query": { "match_all": {} }, "size": 1

}

'

#查询1条数据

9

10

11

12 #实现分页查询13

14

15

16

17

18

19

20

21

22

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search" -H 'Content-Type:
application/json' -d'

{

"query": { "match_all": {} }, "from": 10,

"size": 10

}

'

#可考虑在_search后面添加格式:_search?/pretty 看看输出结果。

匹配查询字段

返回_source字段中的片段字段

1 [root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
'Content-Type: application/json' -d'

2 {

  1. "query": { "match_all": {} },

  2. "_source": ["account_number", "balance"] 5 }

    6 '

match

基本搜索查询,针对特定字段或字段集合进行搜索

#查询编号为20的账户

  1. [root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
    'Content-Type: application/json' -d'

3 {

4 "query": { "match": { "account_number": 20 } } 5 }

6 '

7

8

9

10

11

12

#返回地址中包含mill的账户

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
'Content-Type: application/json' -d'

{

"query": { "match": { "address": "mill" } }

}

13

14

15

16

17

18

'

#返回地址有包含mill或lane的所有账户

19

20

21

22

23

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
'Content-Type: application/json' -d'

{

"query": { "match": { "address": "mill lane" } }

}

'

bool

1

2

3

4

5

# bool must 查询的字段必须同时存在

#查询包含mill和lane的所有账户

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
'Content-Type: application/json' -d'

{

"query": {

"bool": {

"must": [

{ "match": { "address": "mill" } },

{ "match": { "address": "lane" } }

]

}

}

}

'

#bool should 查询的字段仅存在一即可

#查询包含mill或lane的所有账户

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
'Content-Type: application/json' -d'

{

"query": {

"bool": {

"should": [

{ "match": { "address": "mill" } },

{ "match": { "address": "lane" } }

]

33

34

35

36

}

}

}

'

range

指定区间内的数字或者时间

操作符:gt大于,gte大于等于,lt小于,lte小于等于

[root\@vm2 \~]# curl -X GET "localhost:9200/bank/_search?pretty" -H
'Content-Type: application/json' -d'

{

"query": {

"bool": {

"must": { "match_all": {} }, "filter": {

"range": {

"balance": {

"gte": 20000,

"lte": 30000

}

}

}

}

}

}

'

1 #查询余额大于或等于20000且小于等于30000的账户

2

3

elasticsearch-head

elasticsearch-head是集群管理、数据可视化、增删改查、查询语句可视化工具。从ES5版本后安装方式和ES2以上的
版本有很大的不同,在ES2中可以直接在bin目录下执行plugin install xxxx
来进行安装,但是在ES5中这种安装方式变了, 要想在ES5中安装Elasticsearch
Head必须要安装NodeJs,然后通过NodeJS来启动Head。


官网地址:<https://github.com/mobz/elasticsearch-head>;

elasticsearch-head安装

第1步: 官网有安装说明,可以通过git安装,也可以下载zip包解压安装

这里去下载相应的软件包,并拷贝到ES集群的一个节点上(我这里拷贝到10.1.1.12这台,也就是vm2上)

mark

nodejs下载页面: <https://nodejs.org/en/download/>;

mark

第2步: 安装nodejs

[root\@vm2 \~]# ln -s /usr/local/nodejs/bin/npm /bin/npm

[root\@vm2 \~]# ln -s /usr/local/nodejs/bin/node /bin/node

确认有此命令

/usr/local/nodejs/bin/npm

[root\@vm2 \~]# tar xf node-v10.15.0-linux-x64.tar.xz -C /usr/local/ [root\@vm2
\~]# mv /usr/local/node-v10.15.0-linux-x64/ /usr/local/nodejs/

[root\@vm2 \~]# ls /usr/local/nodejs/bin/npm

第3步: 安装es-head 安装方法一

先使用npm安装grunt

npm(node package manager):node包管理工具,类似yum
Grunt是基于Node.js的项目构建工具

[root\@vm2 elasticsearch-head]# npm install -g grunt-cli

安装时间较久,还会在网上下载phantomjs包

[root\@vm2 elasticsearch-head]# npm install

[root\@vm2 \~]# git clone git://github.com/mobz/elasticsearch-head.git

[root\@vm2 \~]# cd elasticsearch-head

mark

安装可能有很多错误,我这里出现了下面的错误(重点是注意红色的ERR!,黄色的WARN不用管)

解决方法:

[root\@vm2 elasticsearch-head]# npm install <phantomjs-prebuilt@2.1.16>
--ignore-script

  1. 此命令执行后不用再返回去执行npm install了,直接进入第4步启动服务

[root\@vm2 elasticsearch-head]# npm install
--registry=http://registry.npm.taobao.org

[root\@vm2 elasticsearch-head]# npm install
--registry=http://registry.npm.taobao.org
当安装出现下载phantomjs软件包特别慢的时候,可以ctrl+c取消,拷贝下载好的phantomjs包到特定位置再重新
安装


[root\@vm2 elasticsearch-head]# cp phantomjs-2.1.1-linux-x86_64.tar.bz2
/tmp/phantomjs/

  1. git clone慢的话就使用下载好的zip压缩包解压安装

  2. [root\@vm2 \~]# unzip elasticsearch-head-master.zip -d /usr/local/

  3. [root\@vm2 \~]# mv /usr/local/elasticsearch-head-master/
    /usr/local/elasticsearch-head

  4. [root\@vm2 \~]# cd /usr/local/elasticsearch-head/

  5. [root\@vm2 elasticsearch-head]# npm install -g grunt-cli --
    registry=http://registry.npm.taobao.org

第4步: 启动es-head

10 97

11

12

}

13 [root\@vm2 elasticsearch-head]# nohup npm run start &

1 2 3 [root\@vm2 elasticsearch-head]# vim Gruntfile.js 90 connect: {
4 91 server: {
5 92 options: {
6 93 port: 9100,
7 94 base: '.',
8 95 keepalive: true,
9 96 hostname: '*'

第5步:浏览器访问 http://es-head节点IP:9100
,并在下面的地址里把localhost改为es-head节点IP(浏览器与es-
head不是同一节点就要做
)

mark

第6步: 需要将ES集群配置文件做修改,并重启服务

[root\@vm1 \~]# cat /etc/elasticsearch/elasticsearch.yml |grep -v "#"
cluster.name: elk-cluster

node.name: 10.1.1.11 node.master: false

path.data: /var/lib/elasticsearch path.logs: /var/log/elasticsearch
network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["10.1.1.11", "10.1.1.12"] http.cors.enabled:
true

http.cors.allow-origin: "*"

加上最后这两句

[root\@vm2 \~]# cat /etc/elasticsearch/elasticsearch.yml |grep -v "#"

14

cluster.name: elk-cluster node.name: 10.1.1.12 node.master: true

path.data: /var/lib/elasticsearch path.logs: /var/log/elasticsearch
network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["10.1.1.11", "10.1.1.12"] http.cors.enabled:
true

http.cors.allow-origin: "*"

加上最后这两句

[root\@vm1 \~]# systemctl restart elasticsearch

[root\@vm2 \~]# systemctl restart elasticsearch

第7步: 再次连接就可以看到信息了

mark

mark

新建个索引试试

删除此索引

mark

mark

es-head查询验证

mark

mark

mark

logstash

logstash简介

logstash是一个开源的数据采集工具,通过数据源采集数据.然后进行过滤,并自定义格式输出到目的地。
数据分为:

  1. 结构化数据 如:mysql数据库里的表等

  2. 半结构化数据 如: xml,yaml,json等

  3. 非结构化数据 如:文档,图片,音频,视频等

    logstash可以采集任何格式的数据,当然我们这里主要是讨论采集系统日志,服务日志等日志类型数据。
    官方产品介绍:<https://www.elastic.co/cn/products/logstash>;

mark

input 插 件 : 用 于 导 入 日 志 源 ( 配 置 必 须 )
<https://www.elastic.co/guide/en/logstash/current/input-plugins.html>; filter
插 件 : 用 于 过 滤 ( 不 是 配 置 必 须 的 )
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
output 插 件 : 用 于 导 出 ( 配 置 必 须 )
<https://www.elastic.co/guide/en/logstash/current/output-plugins.html>;




条件判断

1 #使用条件来决定filter和output处理特定的事件 2

3 比较操作:

4 相等: ==, !=, \<, \>, \<=, \>=

  1. 正则: =\~(匹配正则), !\~(不匹配正则)

  2. 包含: in(包含), not in(不包含)

  3. 布尔操作:

  4. and(与), or(或), nand(非与), xor(非或)

  5. 一元运算符:

!(取反)

()(复合表达式), !()(对复合表达式结果取反)

可以像其他编程语言那样,条件if判断、多分支,嵌套。

if EXPRESSION {

...

} else if EXPRESSION {

...

} else {

...

}

logstash部署

在logstash服务器上确认openjdk安装

[root\@vm3 \~]# java -version

  1. openjdk version "1.8.0_161"

  2. OpenJDK Runtime Environment (build 1.8.0_161-b14)

  3. OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)

在logstash服务器上安装logstash

[root\@vm3 \~]# wget
https://artifacts.elastic.co/downloads/logstash/logstash-6.5.2.rpm

  1. [root\@vm3 \~]# rpm -ivh logstash-6.5.2.rpm

配置logstash主配置文件

打开注释,并加上配置目录路径打开注释,并改为本机IP

path.config: /etc/logstash/conf.d/ http.host: "10.1.1.13"

path.logs: /var/log/logstash

[root\@vm3 \~]# cat /etc/logstash/logstash.yml |grep -v '#' |grep -v '\^\$'

path.data: /var/lib/logstash

启动测试

[root\@vm3 \~]# cd /usr/share/logstash/bin

  1. 使用下面的空输入和空输出启动测试一下

  2. [root\@vm3 \~]# ./logstash -e 'input {stdin {}} output {stdout {}}'

  3. #运行后,输入字符将被stdout做为标准输出内容输出 5

mark

关闭启动

#上述测试还可以使用如下方法进行:

[root\@vm3]# vim /etc/logstash/conf.d/stdtest.conf

input {

stdin {

}

}

filter {

}

output {

stdout {

codec =\> rubydebug

}

}

[root\@vm3 bin]# pwd

/usr/share/logstash/bin

23

24

25

26

[root\@vm3 bin]# ./logstash --path.settings /etc/logstash -f

/etc/logstash/conf.d/stdtest.conf -t #--path.settings 指定logstash配置文件#-f
指定片段配置文件

#-t 测试配置文件是否正确

27

28

29

[root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

/etc/logstash/conf.d/stdtest.conf

......

[xxxxxxxxxxxxxxxxxxxxx][INFO ][logstash.agent Logstash API endpoint
{:port=\>9600}

abc

] Successfully started

/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-
1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant
::Fixnum is deprecated

{

"host" =\> "node1",

"\@version" =\> "1", "message" =\> "abc",

"\@timestamp" =\> 2019-06-01T01:43:04.539Z

}

xyz

{

"host" =\> "node1",

"\@version" =\> "1", "message" =\> "xyz",

"\@timestamp" =\> 2019-06-01T01:43:08.915Z

}

日志采集

采集messages日志

这里以/var/log/messages为例,只定义input输入和output输出,不考虑过滤

[root\@vm3 bin]# vim /etc/logstash/conf.d/test.conf input {

file {

path =\> "/var/log/messages" start_position =\> "beginning"

}

}

output {

elasticsearch{

hosts =\> ["10.1.1.12:9200"]

index =\> "test-%{+YYYY.MM.dd}"

}

}

注意要给个读权限,默认messages文件为600权限

[root\@vm3 bin]# chmod o+r /var/log/messages

用此方法启动(这里不用systemctl start logstash)

  1. [root\@vm3 bin]# ./logstash --path.settings /etc/logstash/ -f

    /etc/logstash/conf.d/test.conf &

mark

mark

通过浏览器访问es-head验证

请自行练习验证:

1, 在logstash那台服务器上做一些操作(比如,重启下sshd服务),
让/var/log/message有新的日志信息,然后验证es- head里的数据。

结果: 会自动更新, 浏览器刷新就能在es-head上看到更新的数据。

2, kill掉logstash进程(相当于关闭),
也做一些操作让/var/log/message日志有更新,然后再次启动logstash。结果:
会自动连上es集群, es-head里也能查看到数据的更新。

采集多日志源

  1. [root\@vm3 bin]# vim /etc/logstash/conf.d/test.conf

    1. input {

    2. file {

    3. path =\> "/var/log/messages"

    4. start_position =\> "beginning"

    5. type =\> "messages"

      7 }

      8

      9 file {

  2. path =\> "/var/log/yum.log"

  3. start_position =\> "beginning"

  4. type =\> "yum" 13 }

    14 }

    15

    16 output { 17

  5. if [type] == "messages" {

  6. elasticsearch {

    20 hosts =\> ["10.1.1.12:9200"]

    21 index =\> "message-%{+YYYY.MM.dd}" 22 }

    23 }

    24

  7. if [type] == "yum" {

  8. elasticsearch {

    27 hosts =\> ["10.1.1.12:9200"]

    28 index =\> "yum-%{+YYYY.MM.dd}" 29 }

    30 }

    31

    32 }

    33

  9. [root\@vm3 \~]# cd /usr/share/logstash/bin

  10. [root\@vm3 \~]# pkill -9 java

  11. [root\@vm3 bin]# ./logstash --path.settings /etc/logstash/ -f

    /etc/logstash/conf.d/test.conf &

logstash插件

输入插件(input)

Input:输入,输出数据可以是Stdin、File、TCP、Syslog、Redis、Kafka等。<https://www.elastic.co/guide/en/logstash/current/input-plugins.html>;

所有输入插件都支持的配置选项

Setting Input type Required Default Description
add_field hash No {} 添加一个字段到一个事件
codec codec No plain 用于输入数据的编解码器
enable_metric boolean No true
id string No 添加一个ID插件配置,如果没有指定ID,则Logstash将生成一个ID。强烈建议配置此ID,当两个或多个相同类型的插件时,这个非常有用的。例如,有两个文件输入,添加命名标识有助于监视
tags array No 添加任意数量的标签,有助于后期处理
type string No 为输入处理的所有事件添加一个字段,自已随便定义,比如linux系统日志,定义为syslog

stdin

标准输入

}

}

filter {

}

output { stdout {

codec =\> rubydebug

}

}

# cat config.d/stdtest.conf input {

stdin {

file

从文件中读取内容

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html

Setting Input type Required Default Description
close_older number No 3600 单位秒,打开文件多长时间关闭
delimiter string No \n 每行分隔符
discover_interval number No 15 单位秒,多长时间检查一次path选项是否有新文件
exclude array No 排除监听的文件,跟path一样,支持通配符
max_open_files number No 打开文件最大数量
path array YES 输入文件的路径,可以使用通配符 例如/var/log/*/.log,则会递归搜索
sincedb_path string No sincedb数据库文件的路径,用于记录被监控的日志文件当前位置
sincedb_write_interval number No 15 单位秒,被监控日志文件当前位置写入数据库的频率
start_position string, one of ["beginning", "end"] No end 指定从什么位置开始读取文件:开头或结尾。默认从结尾开始,如果要想导入旧数据,将其设置为begin。如果sincedb记录了此文件位置,那么此选项不起作用
stat_interval number No 1 单位秒,统计文件的频率,判断是否被修改。增加此值会减少系统调用次数。
1 2 3 # cat conf.d/filetest.conf input { file {
4 path =\> "/var/log/messages"
5 tags =\> "123"
6 type =\> "syslog"
7 }

8 }

9 filter {

10 }

  1. output {

  2. stdout {

  3. codec =\> rubydebug 14 }

    15 }

    通过TCP套接字读取事件,即接收数据。与标准输入和文件输入一样,每个事件都被定位一行文本。

在运行logstash主机上运行如下配置文件内容,可以考虑使用热加载。

}

output { stdout{

codec =\> rubydebug

}

}

[root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

/etc/logstash/conf.d/tcptest.conf

# cat conf.d/tcptest.conf input {

tcp {

port =\> 12345 type =\> "nc"

}

}

filter {

在其他主机上安装nc工具,加上述主机发送信息,即可被读取出来。

#在vm3上验证

{

"message" =\> "abc", "\@version" =\> "1",

"port" =\> 60410,

"host" =\> "node2",

"type" =\> "nc",

"\@timestamp" =\> 2019-06-01T02:24:49.790Z

}

[root\@vm1 \~]# yum -y install nc [root\@vm1 \~]# nc 192.168.216.144 12345

abc

Beats

从Elastic Beats框架接收事件

1 #logstash片段配置文件

# cat conf.d/filebeattest.conf input {

beats {

port =\> 5044

host =\> "0.0.0.0"

}

}

filter {

}

output {

stdout { codec =\> rubydebug }

}

filebeat.prospectors:

- type: log paths:

- /var/log/messages tags: ["system-log","123"] fields:

level: debug

output.logstash:

hosts: ['127.0.0.1:5044']

#以上6.5.2版本

#以下是7.1.1版本filebeat.inputs:

- type: log enabled: true paths:

- /var/log/messages tags: ["system-log","123"] fields:

level: debug

output.logstash:

hosts: ['127.0.0.1:5044']

# filebeat配置文件

编码插件(Codec)

Logstash处理流程:input-\>decode-\>filter-\>encode-\>output

json/json_lines

该解码器可用于解码(Input)和编码(Output)JSON消息。

如果发送的数据是JSON数组,则会创建多个事件(每个元素一个)
如果传输JSON消息以\n分割,就需要使用json_lines。

  1. #创建片段配置文件

  2. # cat conf.d/codectest.conf

  3. input {

  4. stdin {

  5. codec =\>json {

  6. charset =\> ["UTF-8"] 7 }

    8 }

    9 }

    10 filter { 11

    12 }

  7. output {

  8. stdout{codec =\> rubydebug } 15 }

    16

    17

    18 #运行配置文件19

    20 [root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

/etc/logstash/conf.d/codetest.conf

21

22 #添加测试数据

23 {"ip": "192.168.1.1","method": "GET","bytes": "1.11"}

24

25 #上述输入后直接回车即可。26

27

28 {

29 "\@timestamp" =\> 2019-06-01T02:45:47.305Z,

30 "ip" =\> "192.168.1.1",

  1. "host" =\> "node1",

  2. "method" =\> "GET", 33 "\@version" =\> "1", 34 "bytes" =\> "1.11" 35 }

36

37

multline

程序输出时可能会同时输出多行文件,如果把多行文件一行一行收集会导致文件被破坏

multline可以对程序的同一个输出,即使是多行,也能统一收集。

Setting Input type Required Default Description
auto_flush_interval number No
charset string No UTF-8 输入使用的字符编码
max_bytes bytes No 10M 如果事件边界未明确定义,则事件的的积累可能会导致logstash退出,并出现内存不足。与max_lines组合使用
max_lines number No 500 如果事件边界未明确定义,则事件的的积累可能会导致logstash退出,并出现内存不足。与max_bytes组合使用
multiline_tag string No multiline 给定标签标记多行事件
negate boolean No false 正则表达式模式,设置正向匹配还是反向匹配。默认正向
pattern string Yes 正则表达式匹配
patterns_dir array No [] 默认带的一堆模式
what string, one of ["previous", "next"] Yes 设置未匹配的内容是向前合并还是向后合并。

#在logstash运行主机添加

# cat /etc/logstash/conf.d/codetest.conf input {

stdin {

codec =\> multiline { pattern =\> "\^\s" what =\> "previous"

}

}

}

filter {

}

output {

stdout{codec =\> rubydebug }

}

#pattern 正则匹配,以空格开头的

#what 如果匹配上做什么合并行为,previous与上行合并,next与下一行合并

#输入测试数据abc xxx xyz

123 456

mmm 000

#输出结果

{

"\@timestamp" =\> 2019-06-01T03:00:57.713Z,

"\@version" =\> "1", "tags" =\> [

[0] "multiline"

],

"host" =\> "node1",

"message" =\> "abc xxx xyz\n

123 456\n

mmm 000"

}

#上述内容已被合并

过滤插件(filter)

Filter:过滤,将日志格式化。有丰富的过滤插件:

Grok正则捕获date时间处理JSON编解码

数据修改Mutate geoip等。

所有的过滤器插件都支持以下配置选项:

Setting Input type Required Default Description
add_field hash No {} 如果过滤成功,添加任何field到这个事件。例如:addfield =\> [ "foo%{somefield}", "Hello world, from %{host}" ],如果这个事件有一个字 段somefiled,它的值是hello,那么我们会增加一个字段foo_hello,字段值则用%{host}代替。
add_tag array No [] 过滤成功会增加一个任意的标签到事件例如: addtag =\> [ "foo%{somefield}" ]
enable_metric boolean No true
id string No
periodic_flush boolean No false 定期调用过滤器刷新方法
remove_field array No [] 过滤成功从该事件中移除任意filed。例: removefield =\> [ "foo%{somefield}" ]
remove_tag array No [] 过滤成功从该事件中移除任意标签,例如: removetag =\> [ "foo%{somefield}" ]

json

JSON解析过滤器,接收一个JSON的字段,将其展开为Logstash事件中的实际数据结构。

当事件解析失败时,这个插件有一个后备方案,那么事件将不会触发,而是标记为_jsonparsefailure,可
以使用条件来清楚数据。

也可以使用 tag_on_failure

https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html\#plugins-filters-json-t
ag_on_failure

#在logstash主机创建

# cat /etc/logstash/conf.d/jsontest.conf input {

stdin {

}

}

filter { json {

source =\> "message" target =\> "content"

}

}

output {

stdout{codec =\> rubydebug }

}

#对标准输入的内容进行json格式输出

#把输出内容定向到target指定的content

[root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

/etc/logstash/conf.d/jsontest.conf

#输入测试数据

{"ip":"192.168.1.1","hostname":"node1"}

#输出测试数据

{

"\@timestamp" =\> 2019-06-01T03:29:04.523Z,

"content" =\> {

"ip" =\> "192.168.1.1",

"hostname" =\> "node1"

},

"message" =\> "{\"ip\":\"192.168.1.1\",\"hostname\":\"node1\"}",
"host" =\> "node1",

"\@version" =\> "1"

}

#不指定target

# cat /etc/logstash/conf.d/jsontest.conf input {

stdin {

}

}

filter { json {

source =\> "message"

}

}

output {

stdout{codec =\> rubydebug }

}

20

  1. #执行

  2. [root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

    /etc/logstash/conf.d/jsontest.conf

#输入数据

{"ip":"192.168.1.1","hostname":"node1","age":21}

#输出数据

{

"age" =\> 21,

"message" =\>
"{\"ip\":\"192.168.1.1\",\"hostname\":\"node1\",\"age\":21}", "ip"
=\> "192.168.1.1",

"\@version" =\> "1",

"\@timestamp" =\> 2019-06-01T03:39:13.224Z,

"hostname" =\> "node1", "host" =\> "node1"

}

kv

自动解析为key=value。

也可以任意字符串分割数据。

field_split 一串字符,指定分隔符分析键值对

#URL查询字符串拆分参数示例

# cat /etc/logstash/conf.d/kvtest.conf input {

stdin {

}

}

filter { kv {

field_split =\> "&?"

}

}

output {

stdout {

codec =\> rubydebug

}

}

  1. #文件中的列以&或?进行分隔

  2. #普通方法

    25

  3. #执行

  4. [root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

    /etc/logstash/conf.d/kvtest.conf

    28

  5. #输入数据

  6. address=www.smartgo.com?pid=123&smartgo=user
    31

    32

    33

    34 #输出数据35 {

  7. "smartgo" =\> "user",

  8. "host" =\> "node1", 38 "\@version" =\> "1",

    39 "pid" =\> "123",

    40 "message" =\>
    "address=www.smartgo.com?pid=123&smartgo=user",
    41 "\@timestamp" =\> 2019-06-01T03:49:51.355Z,

    42 "address" =\> "www.smartgo.com" 43 }

    44

    45

    46

    47

    48

    49 #使用正则也可以匹配50

  9. [root\@vm3 bin]# cat /etc/logstash/conf.d/kvtest.conf

  10. input {

  11. stdin {

    54 }

    55 }

    56

  12. filter {

  13. kv {

  14. field_split_pattern =\> ":+" 60 }

    61 }

    62

  15. output {

  16. stdout {

  17. codec =\> rubydebug 66 }

    67 }

    68

    69

geoip

开源IP地址库

<https://dev.maxmind.com/geoip/geoip2/geolite2/>;

[root\@vm3 \~]# cp GeoLite2-City_20190625/GeoLite2-City.mmdb /opt

[root\@vm3 \~]# tar xf GeoLite2-City.tar.gz

3

4

5

6

7

  1. #下载IP地址库

  2. [root\@vm3 \~]# wget
    https://geolite.maxmind.com/download/geoip/database/GeoLite2-

    City.tar.gz

1 #logstash片段配置文件2

  1. # cat /etc/logstash/conf.d/geoiptest.conf

  2. input {

  3. stdin {

    6 }

    7 }

    8

    9 filter {

  4. grok {

  5. match =\> {

  6. "message" =\> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %

    {NUMBER:bytes} %{NUMBER:duration}" 13 }

    14 }

  7. geoip {

  8. source =\> "client"

  9. database =\> "/opt/GeoLite2-City.mmdb" 18 }

    19 }

    20

    21

  10. output {

  11. stdout {

  12. codec =\> rubydebug 25 }

    26 }

    27

    28

    29

    30

  13. #执行

  14. [root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

    /etc/logstash/conf.d/geoiptest.conf

    33

    34

    35 #输入测试数据

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 202.106.0.20 GET /index.html 123 0.331 #输出结果 { "geoip" =\> { "region_name" =\> "Beijing", "country_code3" =\> "CN", "country_name" =\> "China", "ip" =\> "202.106.0.20", "timezone" =\> "Asia/Shanghai", "longitude" =\> 116.3883, "location" =\> { "lat" =\> 39.9289, "lon" =\> 116.3883 }, "country_code2" =\> "CN", "continent_code" =\> "AS", "region_code" =\> "BJ", "latitude" =\> 39.9289 },
58 "\@version" =\> "1",
59 "host" =\> "node1",
60 "client" =\> "202.106.0.20",
61 "bytes" =\> "123",
62 "duration" =\> "0.331",
63 "message" =\> "202.106.0.20 GET /index.html 123 0.331",
64 "\@timestamp" =\> 2019-06-01T04:16:57.937Z,
65 "request" =\> "/index.html",
66 "method" =\> "GET"
67 }
68
69

[root\@vm3 bin]# cat /etc/logstash/conf.d/geoiptest2.conf input {

stdin {

}

}

filter { grok {

match =\> {

"message" =\> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %

{NUMBER:bytes} %{NUMBER:duration}"

11

12

13

}

}

geoip {

source =\> "client"

database =\> "/opt/GeoLite2-City.mmdb" target =\> "geoip"

fields =\> ["city_name", "country_code2", "country_name","region_name"]

}

  1. output {

  2. stdout {

  3. codec =\> rubydebug 26 }

    27 }

    28

    29

    30

  4. #执行

  5. [root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

/etc/logstash/conf.d/geoiptest2.conf

33

34

35 #输入测试数据36

37 110.226.4.6 GET /home.html 518 0.247

38

39 #输出结果40

41 {

  1. "request" =\> "/home.html",

  2. "host" =\> "node1",

  3. "method" =\> "GET",

    45 "\@timestamp" =\> 2019-06-01T04:22:46.946Z,

    46 "client" =\> "110.226.4.6",

    47 "duration" =\> "0.247",

    48 "bytes" =\> "518",

    49 "message" =\> "110.226.4.6 GET /home.html 518 0.247",

    50 "\@version" =\> "1",

  4. "geoip" =\> {

  5. "country_code2" =\> "IN",

  6. "country_name" =\> "India" 54 }

    55 }

    56

grok

grok是将非结构化数据解析为结构化

这个工具非常适于系统日志,mysql日志,其他Web服务器日志以及通常人类无法编写任何日志的格式。
默认情况下,Logstash附带约120个模式。也可以添加自己的模式(patterns_dir)

模式后面对应正则表达式

查看模式地址:<https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns>;

包含字段如下

Setting Input type Required Default Description
break_on_match boolean No true
keep_empty_captures No false 如果true将空保留为事件字段
match hash No {} 一个hash匹配字段=\>值
named_captures_only boolean No true 如果true,只存储
overwrite array No [] 覆盖已存在的字段的值
pattern_definitions No {}
patterns_dir array No [] 自定义模式
patterns_files_glob string No * Glob模式,用于匹配patterns_dir指定目录中的模式文件
tag_on_failure array No _grokparsefailure tags没有匹配成功时,将值附加到字段
tag_on_timeout string No _groktimeout 如果Grok正则表达式超时,则应用标记
timeout_millis number 30000 正则表达式超时时间

grok模式语法

格式:%{SYNTAX:SEMANTIC}

SYNTAX 模式名称

SEMANTIC 匹配文本的标识符

例如:%{NUMBER:duration} %{IP:client}

# 配置文件中应该包含如下字段

filter { grok {

match =\> {

"message" =\> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %

{NUMBER:bytes} %{NUMBER:duration}"

}

}

}

1 #虚构http请求日志抽出有用的字段

2 55.3.244.1 GET /index.html 15824 0.043

自定义模式

如果默认模式中没有匹配的,可以自己写正则表达式。

1 # cat /opt/patterns 2 ID [0-9A-Z]{10,11} 3

#配置文件中应包含如下内容

filter { grok {

patterns_dir =\>"/opt/patterns" match =\> {

"message" =\> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %

{NUMBER:bytes} %{NUMBER:duration} %{ID:id}"

}

}

}

#完整文件内容

[root\@vm3 \~]# cat /etc/logstash/conf.d/groktest.conf input {

stdin {

}

}

filter { grok {

patterns_dir =\>"/opt/patterns" match =\> {

"message" =\> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %

{NUMBER:bytes} %{NUMBER:duration} %{ID:id}"

}

}

}

31

output {

stdout {

codec =\> rubydebug

}

}

#执行

[root\@vm3 bin]# ./logstash --path.settings /etc/logstash -r -f

/etc/logstash/conf.d/groktest.conf

#输入测试数据

55.3.244.1 GET /index.html 15824 0.043 15BF7F3ABB

#输出测试数据

{

"request" =\> "/index.html",

"message" =\> "55.3.244.1 GET /index.html 15824 0.043 15BF7F3ABB", "id" =\>
"15BF7F3ABB",

"host" =\> "node1", "duration" =\> "0.043",

"\@version" =\> "1",

"\@timestamp" =\> 2019-06-26T04:49:54.022Z, "client" =\> "55.3.244.1",

"method" =\> "GET", "bytes" =\> "15824"

}

多模式匹配

一个日志可能有多种格式,一个匹配可以有多条规则匹配多种格式。一条匹配模式,如果匹配不到,只会到message字段。

#新版本项目日志需要添加日志字段,需要兼容旧日志匹配

#模式匹配文件

# cat /opt/patterns ID [0-9A-Z]{10,11} TAG SYSLOG

# 配置文件

input { stdin {

}

}

filter {

grok {

  1. patterns_dir =\>"/opt/patterns"

  2. match =\>[

  3. "message", "%{IP:client} %{WORD:method} %{URIPATHPARAM:request}
    %{NUMBER:bytes}

    %{NUMBER:duration} %{ID:id}",

  4. "message", "%{IP:client} %{WORD:method} %{URIPATHPARAM:request}
    %{NUMBER:bytes}

    %{NUMBER:duration} %{TAG:tag}" 22 ]

    23 }

    24 }

  5. output {

  6. stdout{codec =\> rubydebug } 27 }

    28

    29

使用[]时,把[]中的=\>换成逗号。

输出插件(output)

Output:输出,输出目标可以是Stdout、ES、Redis、File、TCP等。

ES

Setting Input type Required Default Description
hosts URL No
index string No logstash-% {+YYYY.MM.dd} 将事件写入索引。默认按日期划分。
user string No ES集群用户
password password No ES集群密码

#输出格式

  1. output {

  2. elasticsearch {

  3. hosts =\> ["localhost:9200"]

  4. index =\> "xxxx-admin-%{+YYYY.MM.dd}"

6 }

7 }

8

  1. #完整文件

    1. input {

    2. file {

    3. path =\> ["/var/log/messages"]

    4. type =\> "system"

    5. tags =\> ["syslog","test"]

    6. start_position =\> "beginning" 16 }
  2. file {

  3. path =\> ["/var/log/audit/audit.log"]

  4. type =\> "system"

  5. tags =\> ["auth","test"]

  6. start_position =\> "beginning" 22 }

    23 }

    24 filter { 25

    26 }

  7. output {

  8. if [type] == "system" {

  9. if [tags][0] == "syslog" {

  10. elasticsearch {

    31 hosts =\> ["http://es1:9200","http://es2:9200","http://es3:9200"]

    32 index =\> "logstash-system-syslog-%{+YYYY.MM.dd}" 33 }

    34 stdout { codec=\> rubydebug } 35 }

  11. else if [tags][0] == "auth" {

  12. elasticsearch {

    38 hosts =\> ["http://es1:9200","http://es2:9200","http://es3:9200"]

    39 index =\> "logstash-system-auth-%{+YYYY.MM.dd}" 40 }

    41 stdout { codec=\> rubydebug } 42 }

    43 }

    44 }

    45



版权声明
本文为[osc_mox9hom9]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4274636/blog/4715746