ELK Stack基础教程(5)Filebeat的安装、配置与使用教程
一、Filebeat是什么
早期的ELK架构将日志采集工作交给Logstash来完成,但是Logstash是一个偏重量级的Java应用,占用系统资源较高,而且每台客户端都要配置JDK也比较麻烦。为此出现了轻量级的日志采集器Filebeat。这让整个工作流变成了Filebeat --> Logstash --> Elasticsearch --> Kibana。这里需要注意的是beat家族有很多成员,Filebeat只是其中一个负责日志采集类的beat,除此之外还有偏向指标的metricbeat(类似Prometheus的各种exporter)、偏向数据包的packetbeat等,通过官方网站https://www.elastic.co/cn/beats/filebeat可以看到详细介绍。
二、Elasticsearch+Filebeat+Kibana架构部署
1、Filebeat提供了二进制包,只需要下载解压即可使用
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.6-linux-x86_64.tar.gz tar zxf filebeat-7.17.6-linux-x86_64.tar.gz mv filebeat-7.17.6-linux-x86_64.tar.gz /usr/local/
2、Filebeat的目录结构与配置如下图,通常只需要对filebeat.yml进行修改,该文件用于配置日志的采集源以及输出目标
3、编辑配置文件filebeat.yml,指定需要采集的日志路径以及需要传输到哪个ES
filebeat.inputs: #采集系统日志 - type: log #通常采集日志文件的话type就为log,还有stdin、redis、docker等其它类型 enabled: true #启用开关 paths: #日志文件路径 - /var/log/message.log #include_lines: ['^ERR','^WARN'] #只采集匹配的日志,支持正则匹配 tags: ["system"] #支持标签定义,当采集多个日志的时候可以用标签来创建索引,也可以用logstash的type来区分 #采集nginx日志 - type: log enabled: true paths: - /var/log/nginx/access.log tags: ["nginx"] #采集tomcat日志,使用了multiline插件 - type: log enabled: true paths: - /var/log/tomcat/tomcat_log.*.txt multiline.pattern: '^\d{2}' #匹配以2个数字开头的日志 multiline.negate: true multiline.match: after multiline.max_lines: 1000 #合并的最大行数,默认是500 tags: ["tomcat"] #ES部分 output.elasticsearch: #直接输出给ES hosts: ["http://192.168.1.100:9200"] #集群的话写多个地址,用逗号分隔 indices: - index: "system-%{[agent.version]}-%{+yyyy.MM.dd}" #自定义索引名 when.contains: tags: "system" - index: "nginx-%{[agent.version]}-%{+yyyy.MM.dd}" when.contains: tags: "nginx"
4、启动Filebeat,启动速度相比Logstash会快许多,内存占用也更少
nohup /usr/local/src/filebeat-7.17.6-linux-x86_64/filebeat -e -c /usr/local/src/filebeat-7.17.6-linux-x86_64/filebeat.yml &
5、使用Kibana查看数据并出图
三、Elasticsearch+Logstash+Filebeat架构
如果Filebeat直接把日志传给ES的话,由于其不支持正则、无法移除字段等,没办法实现数据分析,所以更优的配置是将Filebeat收集到的日志输出给一台Logstash服务器,由Logstash处理后再给ES。为了实现这个要求,就需要修改Filebeat的output段以及Logstash的input段
1、修改Filebeat输出段的配置,将输出给到Logstash,其他配置不动
filebeat.inputs: - type: log #一个filebeat可采集多个日志,采用不同的标签进行区分,然后logstash会根据标签进行判断处理 paths: - /data/mysql3306/log/slowlog/mysql.slow multiline.type: pattern multiline.pattern: '^# User@Host: ' multiline.negate: true multiline.match: after tags: ["mysql_slowlog"] #给日志打一个标签用于分类管理 - type: log paths: - /data/redis/logs/redis.log tags: ["redis_log"] #给日志打一个标签用于分类管理 output.logstash: hosts: ["172.20.1.221:5044"]
2、修改Logstash input相关配置,其他不动
input { beats { #从filebeat获取数据 port => 5044 } } filter { grok { match => [ "message", "(?m)^# User@Host: %{USER:query_user}\[[^\]]+\] @ (?:(?<query_host>\S*) )?\[(?:%{IP:query_ip})?\]\s+Id:\s+%{NUMBER:id:int}\s# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)" ] } grok { match => { "message" => "# Time: " } add_tag => [ "drop" ] tag_on_failure => [] } if "drop" in [tags] { drop {} } } output { if "mysql_slowlog" in [tags]{ #根据标签创建不同的索引 elasticsearch { hosts => ["172.20.1.221:9200"] index => "mysql_slowlog-%{+yyyy.MM.dd}" } } if "redis_log" in [tags]{ elasticsearch { hosts => ["172.20.1.221:9200"] index => "redis_log-%{+yyyy.MM.dd}" } } }
四、Elasticsearch+Logstash+Kafka+Filebeat架构(推荐架构)
当生产环境中有多台Filebeat同时向Logstash传输数据再交给ES入库时,后端服务器的负载会比较大,为了减轻这种压力可以在Filebeat和Logstash之间加上一层消息队列,比如Redis、Kafka。由于Redis太吃内存,而Kafka是基于磁盘存储,所以数据量很大的时候更推荐Kafka
1、修改Filebeat输出段的配置,将输出传送给Kafka
filebeat.inputs: - type: log paths: - /var/log/nginx/access.log fields: type: access fields_under_root: true - type: log paths: - /var/log/secure fields: type: system fields_under_root: true processors: - drop_fields: fields: ["agent","ecs","log","input"] output: kafka: hosts: ["192.168.238.90:9092", "192.168.238.92:9092"] topic: linuxe
2、修改Logstash input相关配置,从Kafka读取数据
input { kafka { bootstrap_servers => "192.168.238.90:9092,192.168.238.92:9092" topics => ["linuxe"] group_id => "linuxegroup" codec => "json" } } filter { if [type] == "access" { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } date { match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } } else if [type] == "system" { } } output { if [type] == "access" { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] index => "access-%{+YYYY.MM.dd}" } } else if [type] == "system" { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] index => "system-%{+YYYY.MM.dd}" } } }
3、查看Kafka队列中是否有数据
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092 --list #查看Group,应该能看到log这个队列存在 /usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.1.100:9092 --group log --describe #查看具体的队列和消费信息
五、Filebeat的多行处理功能
Filebeat 收集到的日志文件可能存在跨行消息,比如Java堆栈日志和MySQL慢日志都是以多行文本组成一条日志的呈现形式。为了让Filebeat能正确处理这些多行文本,需要在配置文件中新增multiline设置(这个设置在Logstash也有),通过multiline的配置可以让Filebeat知道哪些行是属于同一个事件,然后再将组装好的数据发送到 Logstash。
filebeat.inputs: - type: log paths: - /data/mysql3306/log/slowlog/mysql.slow multiline.type: pattern multiline.pattern: '^# Time: ' #通过正则来匹配日志内容,比如这里是以Time开头的行 multiline.negate: true #是否采用上面正则匹配的结果,在这里true代表不以Time开头的内容才匹配,false代表以Time开头的行是匹配的 multiline.match: after #before代表匹配内容与日志前面的内容合并,after代表Time行之后的内容 output.logstash: hosts: ["172.20.1.221:5044"]
六、Filebeat的processors(处理器)功能
Filebeat通过处理器可以实现部分的数据过滤和增强功能,只需要在配置中定义好以后就可以在将日志发送到输出目标之前对其进行处理。Filebeat提供的处理器可以通过https://www.elastic.co/guide/en/beats/filebeat/7.17/filtering-and-enhancing-data.html 进行查看
· 过滤无用日志内容
filebeat.inputs: - type: log paths: - /data/mysql3306/log/slowlog/mysql.slow multiline.type: pattern multiline.pattern: '^# Time: ' #通过正则来匹配日志内容,比如这里是以Time开头的行 multiline.negate: true #是否采用上面正则匹配的结果,在这里true代表不以Time开头的内容才匹配,false代表以Time开头的行是匹配的 multiline.match: after #before代表匹配内容与日志前面的内容合并,after代表Time行之后的内容 processors: - drop_fields: fields: ["ecs","agent","input","log"] #删除filebeat自身的多余字段 # - drop_event: # when: # regexp: # message: "test|tanglu" #通过正则匹配需要过滤掉的内容,如果日志中出现这样的关键词则不采集 #- drop_event: # when: # not: # contains: # message: "linuxe" #也可以通过反选的形式,如果不包含linuxe的都不采集 output.logstash: hosts: ["172.20.1.221:5044"]
下面是一个经过drop_fields处理的日志内容,可以看到很多无用字段已经被删除了
七、Filebeat采集JSON日志
ELK对JSON日志格式的支持较好,不用去写正则匹配
1、Filebeat部分,直接采集日志即可
filebeat.inputs: - type: log paths: - /var/log/nginx/access.json.log output: logstash: hosts: ["192.168.238.90:5044"]
2、Logstash部分
input { beats { port => 5044 } } filter { json { source => "message" #json日志所在字段 remove_field => ["message"] } } output { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] index => "json_log-%{+YYYY.MM.dd}" } }
八、使用Filebeat的module进行快速配置
Filebeat内置有大量module,比如Nginx module、MySQL module。使用这些module可以简化日志收集的过程,包括Kibana出图,甚至可以通过官方module学习出图。下面以MySQL为例进行配置:
1、开启Filebeat模块相关配置
filebeat.inputs: filebeat.config.modules: path: ${path.config}/modules.d/*.yml #模块配置文件路径,查看支持的模块时就会从该目录去寻找配置文件 reload.enabled: true ##开启模块功能 reload.period: 10s setup.kibana: host: "172.20.1.221:5601" processors: - drop_fields: fields: ["ecs","agent","input","log"] output.elasticsearch: #使用模块后需要直接输出到ES中,否则会报错 hosts: ["172.20.1.221:9200"]
2、查看当前支持的module
filebeat modules list
3、启动指定的module
filebeat modules enable mysql
4、经过启动的module配置文件名字会自动被修改
5、配置module,只需要修改日志路径
vi /usr/local/filebeat/modules.d/mysql.yml - module: mysql slowlog: enabled: true var.paths: ["/data/mysql3306/log/slowlog/mysql.slow","/data/mysql3307/log/slowlog/mysql.slow"] error: enabled: true var.paths: ["/data/mysql3306/log/error.log","/data/mysql3307/log/error.log"]
6、初始化环境,如果有安装多个module的话只用初始化一次
filebeat setup -e
7、重启filebeat,需要注意的是索引名是自动创建好了的,不要自定义,然后到Kibana中看图即可
评论