ELK日志系统配置优化,ELK系统配置还算是挺多的,各个input output之间输入输出的,初学者可能会对其中一些配置不知如何下手。如不使用logstash如何配置等,如使用input redis , output elastic等
filebeat的配置信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| cat /etc/filebeat/..data/filebeat.yml filebeat.yml:
sh-4.2$ more filebeat.yml filebeat.inputs: - type: log paths: - /logs/* fields: project: ${PROJECT} group: ${GROUP} stage: ${STAGE} format: ${FORMAT}
multiline: pattern: '^\[[^stacktrace]' negate: true match: after processors: - add_cloud_metadata: - add_host_metadata:
output.redis: hosts: xxx.redis.rds.aliyuncs.com password: xxxpasswd
|
logstash的配置信息:
据说filter对性能有一定的影响,用不着的话可以注释掉
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| cat /usr/share/logstash/pipeline/logstash.conf: input { redis { host => "xxx.redis.rds.aliyuncs.com" port => 6379 password => "xxxpasswd" data_type => "list" key => "filebeat" } } filter { if [fields][format] == "log4j2" { grok { match => {"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{HOSTNAME:hostname}\] \[%{LOGLEVEL:level}\] \[%{DATA:thread}\] %{DATA:method} - %{GREEDYDATA:msg}"} } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" timezone => "Asia/Shanghai" } } if [fields][format] == "php" { grok { match => {"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{WORD:env}.%{LOGLEVEL:level}: %{GREEDYDATA:msg}"} } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" timezone => "Asia/Shanghai" } } if [fields][format] == "nginx-access" { grok { match => {"message" => '%{IP:http_x_forwarded_for} %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:http_method} %{URIPATHPARAM:http_request} HTTP/%{NUMBER:http_version}\" %{INT:http_status} %{INT:body_bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\" %{DATA:upstream_addr} \[%{NUMBER:request_time} - %{DATA:upstream_response_time}\]'} } date { match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } if [upstream_response_time] != "-" { mutate { replace => { "upstream_response_time" => "0" } } } mutate { convert => {"body_bytes_sent" => "integer"} convert => {"upstream_response_time" => "float"} convert => {"request_time" => "float"} convert => {"http_status" => "integer"} } geoip { source => "http_x_forwarded_for" } if [http_user_agent] != "-" { useragent { target => "user_agent" source => "http_user_agent" } } } } output { elasticsearch { hosts => ["elasticsearch:9200"] index => "logstash-%{+YYYY-MM-dd}" user => "elastic" password => "elastic-password" } }
|
将所有的日志都写入了一个索引,此处优化一下,分项目进行索引分类:
1 2 3 4 5 6 7 8
| output { elasticsearch { hosts => ["elasticsearch:9200"] index => "logstash-%{[fields][project]}-%{+YYYY-MM-dd}" user => "elastic" password => "elastic-password" } }
|
kibana上查看到相对应的索引了,针对其进行创建
其它配置:
EFK模式下根据日志文件中的type输出到不同的索引
此处output还根据条件进行了if else
input {
file {
path => “/usr/local/my.log”
start_position => “beginning”
type => “infolog”
sincedb_path => “/dev/null”
}
file {
path => “/usr/local/my1.log”
start_position => “beginning”
type => “errlog”
sincedb_path => “/dev/null”
}
}
filter {
json {
source => “message”
}
date {
match => [“timestamp”, “dd/MMM/yyyy:HH:mm:ss Z”] #匹配timestamp字段
target => “@timestamp” #将匹配到的数据写到@timestamp字段中
}
}
output {
if [type] == “infolog” {
elasticsearch {
hosts => [“test:9200”]
index => “infolog-%{+YYYY.MM.dd}”
}
} else if [type] == “errlog” {
elasticsearch {
hosts => [“test:9200”]
index => “errlog-%{+YYYY.MM.dd}”
}
}
}
其它写法
1 2 3 4 5 6 7
| output { elasticsearch{ action => "index" index => "%{[fields][product_type]}-transaction-%{+YYYY-MM}" hosts => ["10.0.xx.xx:9200", "10.0.xx.xx:9200", "10.0.xx.xx:9200"] } }
|
还有在filter中写索引的
filter {
if [log_type] in [ "test", "staging" ] {
mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }
} else if [log_type] == "production" {
mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }
} else {
mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }
}
}
output {
elasticsearch {
index => "%{[@metadata][target_index]}"
}
}
- 日志文件收集input到redis中,redis中的key可以根据项目进行设置,不使用默认的filebeat,是否会对取数据的时候效率更高?
- 使用loki系统不失为另外的一种方法