ELK Stack基础教程(4)Logstash的安装、配置与使用教程
Logstash可以收集日志数据并对数据进行处理后传递给ES,但是现阶段通常使用它做数据分析和过滤工作,采集工作交由filebeat完成。因为使用它采集日志的话需要在所有需要收集日志的服务器上安装Logstash,由于Logstash偏重量级,现在通常不会这样做,而是交给各种beat去完成这个工作,然后最后交给Logstash处理后再传递给ES。如果数据量太大可以在agent与server端之前部署消息队列,一般使用Redis或者Kafka。
一、Logstash下载与安装步骤
1、在ELK官网www.elastic.co下载Logstash 6.3,其他版本安装方式基本一致。Logstash比较重量级的原因是它也需要依赖JAVA环境,所以依然要提前配置JDK。如果是使用的ES7版本,那么已经内置了OpenJDK可以用,只需要配置好JAVA环境变量即可
cd /usr/local/src wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.6-linux-x86_64.tar.gz tar zxf logstash-7.17.6-linux-x86_64.tar.gz mv logstash-7.17.6 /usr/local chown -R elastic. /usr/local/logstash-7.17.6/ #logstash不能用root启动,所以使用单独账号 su - elastic
二、Logstash配置与启动方式
1、标准输入输出(主要用于测试)
bin/logstash -e 'input{ stdin{} } output{ stdout{} }' #-e代表使用命令行模式,采用标准输入、标准输出 Settings: Default pipeline workers: 1 Pipeline main started hello world #这里手动输入的hello world,作为标准输入 2016-05-19T01:59:03.797Z 0.0.0.0 hello world #标准输出的结果显示在屏幕 nihao 2016-05-19T02:00:20.013Z 0.0.0.0 nihao
2、codec风格输出(codec是一种解码编码的工具)
/usr/local/logstash-7.17.6/bin/logstash -e 'input{ stdin{} } output{ stdout{codec => rubydebug} }' # -e选项是直接用命令行模式,输入采用标准输入,采用codec风格输出 Settings: Default pipeline workers: 1 Pipeline main started hello world { "message" => "hello world", #输入的内容 "@version" => "1", #版本号 "@timestamp" => "2016-05-19T02:09:43.921Z", #自动生成时间戳 "host" => "0.0.0.0" #数据是由哪个节点发过来的
3、使用配置文件启动
启动服务时加上-t选项可以检查配置文件是否正确,-f选项指明配置文件路径,注意启动服务时不要使用root用户。服务启动后就可以到Elasticsearch或者Kibana就可以查看到数据了。如果Logstash的配置文件发生了变化只需要重启Logstash即可,Elasticsearch和Kibana里的数据都会自动更新
su - elastic nohup /usr/local/logstash-7.17.6/bin/logstash -f /usr/local/logstash-7.17.6/config/logstash_test.yml -w 4 & #-w选项用于启动多个线程,默认是2 kill -1 $logstash_pid #重启logstash,这里自行替换为正确的pid
四、Logstash配置文件
在正式的生产环境中要启动Logstash需要配置一系列复杂的规则以满足需求,所以就需要通过配置文件来自定义这些规则。Logstash和ElasticSearch一样有2个配置文件需要调整,其中jvm.options和Elasticsearch中的一样的用于优化堆栈内存,另一个config/logstash.yml,通过该配置文件来进行采集和过滤数据等操作。自定义配置文件时主要配置的内容包括input(用于指定输入源,包含本地日志、redis、stdin标准输入等多种方式)、filter(进行规则过滤和处理)、output(指定输出目标)三个区。在Logstash中输入的数据可以通过很多种方式获取,比如本地日志文件、filebeat、数据库等。而输出也可以指定到需要的容器中,如Elasticsearch
1、将日志文件作为Logstash的输入
结合file插件可以把本地产生的日志持续输出到ES中,file插件使用了一个sincedb文件记录当前文件读取位置,即使重新启动服务也不会丢失文件的读取位置。默认情况下sincedb文件放在运行Logstash的用户的主目录中,可以用sincedb_path选项自定义存放路径。下面是一个使用file插件的简单案例,由于这里的配置output使用了标准输出,所以会有大量日志输出到屏幕
vi etc/logstash.conf input { file { path => [ "/var/log/message","/var/log/*.log" ] #文件路径 exclude => "/var/log/*.gz" #排除不需要监听的文件 type => "system" #自定义事件的类型,可用于后续的条件判断 add_field => {"key" => "test"} #自定义新增字段 start_position => "beginning" #从日志文件头部读取,相反还有end } } output { stdout { codec => rubydebug } }
2、将日志文件作为Logstash的输入,并且将收集到的数据提交给Elasticsearch
结合output插件可以实现把采集到的数据输出ElasticSearch中
vi /etc/logstash.conf input { file { path => "/usr/local/logstash-7.17.6/logs/logstash-plain.log" start_position => "beginning" } } output { elasticsearch { hosts => ["172.20.1.221:9200"] } index => "logstash-%{+YYYY.MM.dd}" }
3、Logstash filter模块与常用插件
filter模块是Logstash中最重要和复杂的一个配置,它可以通过多种插件实现对收集到的日志做多种处理,比如解析数据、删除字段、切割、文本格式化、日期转换等
· 使用grok插件对日志内容进行格式化输出
grok插件可以对日志内容按照正则匹配后进行格式化输出,比如将日志中与设置好的正则相匹配的字段重新定义为另一个字段进行单独展示,更方便聚合统计处理。用户可以自定义正则也可以使用Logstash预定义好的正则,这些定义好的规则文件大致目录为logstash/vendor/bundle/jruby/%version/gems/logstash-patterns-core-$version/patterns/grok-patterns下。通过Kibana内置的grokdebug工具可以对日志内容与规则进行校验。
Grok书写格式与示例
%{SYNTAX:SEMANTIC} #SYNTAX表示grok中定义好的pattern,SEMANTIC表示自定义的字段 %{IP:client} #大写的单词是预定义好的正则表达式,可以在patterns目录看到其具体匹配的内容。如果匹配的话就将该字段名称重新定义为冒号后面的小写的单词 #日志内容 55.3.244.1 GET /index.html 15824 0.043 #grok正则 %{IP:client}%{WORD:method}%{URIPATHPARAM:request}%{NUMBER:bytes}%{NUMBER:duration} #grok过滤后的信息格式 client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
演示示例
#Nginx日志 192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /linuxe HTTP/1.1" 404 3650 "-" "Chrome xxx" "-" #自定义正则 %{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent} #完整示例 input { file { path => "/var/log/nginx/access.log" } } filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] #grok处理后删除原有的message字段 } } output { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] index => "sjgnginx-%{+YYYY.MM.dd}" } }
如果产生的日志内容无法被grok处理会出现_grokparsefailure的错误提示,然后会将日志数据作为一个整体全部放入message字段,可以把这部分处理失败的日志单独放入一个索引方便查看
output { if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] index => "grok_log-%{+YYYY.MM.dd}" } } else{ elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] index => "grok_fail_log-%{+YYYY.MM.dd}" } } }
· 使用date插件进行日期转换
date插件主要用于日期格式的转换,可以将日期类型的字符串生格式化成一个新的字段,然后覆盖掉默认的@timestamp字段。主要解决日志本身产生的时间与Kibana收集时间不一致的问题,比如有个历史日志是2021年产生,然后2022年想通过ELK进行分析处理,如果不做配置的话在Kibana看到的日志时间将是2022年收集到日志的那一刻,而不是日志本身的时间
filter { date{ match => [ "logdate", "MMM dd yyyy HH:mm:ss"] #定义一个logdate字段,字段格式为后面引号内的内容。当logstash遇到这样格式的内容就会放入到logdate字段中 target => "@timestamp" #用logdatae覆盖默认的timestamp字段 timezone => "Asia/Shanghai" } }
· 使用mutate插件进行数据修改
mutate插件可以对字段内容做处理,比如使用rename进行字段重命名、使用remove_field删除无用字段、使用convert进行数据类型转换等,转换后的数据一般用于Kibana聚合出图,当Kibana中缺少需要的字段时可以考虑是否数据类型不对
filter { mutate { remove_field => [ "message" ] #删除message字段,多个字段用逗号分隔 convert => { "userid" => "integer" #将userid字符串格式转换为int整数型 } } }
· 使用geoip插件新增地理位置信息
filter { geoip { source => "ip" #source指定需要处理的字段,这里就是将IP字段中的数据进行分析 fields => ["location","country_name","city_name"] #对geoip输出的N多字段过滤,只看自己需要的 } }
· dissect插件
基于分割符来切割字段,避免grok消耗CPU的问题,性能是grok的三倍左右。但是需要每行日志格式相似,并有明显的分割符,下图就是把一条日志进行了切割,+号代表追加到某个字段中
评论