ELK日志系统配置优化,ELK系统配置还算是挺多的,各个input output之间输入输出的,初学者可能会对其中一些配置不知如何下手。如不使用logstash如何配置等,如使用input redis , output elastic等

filebeat的配置信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

cat /etc/filebeat/..data/filebeat.yml
filebeat.yml:

sh-4.2$ more filebeat.yml
filebeat.inputs:
- type: log
paths:
- /logs/*
## field字段动态赋值,与k8s中的filebeat容器
fields:
project: ${PROJECT}
group: ${GROUP}
stage: ${STAGE}
format: ${FORMAT}

multiline:
pattern: '^\[[^stacktrace]'
negate: true
match: after
processors:
- add_cloud_metadata:
- add_host_metadata:

output.redis:
hosts: xxx.redis.rds.aliyuncs.com
password: xxxpasswd

logstash的配置信息:

据说filter对性能有一定的影响,用不着的话可以注释掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
cat /usr/share/logstash/pipeline/logstash.conf:
input {
redis {
host => "xxx.redis.rds.aliyuncs.com"
port => 6379
password => "xxxpasswd"
data_type => "list"
key => "filebeat"
}
}
filter {
if [fields][format] == "log4j2" {
grok {
match => {"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] \[%{HOSTNAME:hostname}\] \[%{LOGLEVEL:level}\] \[%{DATA:thread}\] %{DATA:method} - %{GREEDYDATA:msg}"}
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
if [fields][format] == "php" {
grok {
match => {"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{WORD:env}.%{LOGLEVEL:level}: %{GREEDYDATA:msg}"}
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
if [fields][format] == "nginx-access" {
grok {
match => {"message" => '%{IP:http_x_forwarded_for} %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:http_method} %{URIPATHPARAM:http_request} HTTP/%{NUMBER:http_version}\" %{INT:http_status} %{INT:body_bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\" %{DATA:upstream_addr} \[%{NUMBER:request_time} - %{DATA:upstream_response_time}\]'}
}
date {
match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
if [upstream_response_time] != "-" {
mutate {
replace => { "upstream_response_time" => "0" }
}
}
mutate {
convert => {"body_bytes_sent" => "integer"}
convert => {"upstream_response_time" => "float"}
convert => {"request_time" => "float"}
convert => {"http_status" => "integer"}
}
geoip {
source => "http_x_forwarded_for"
}
if [http_user_agent] != "-" {
useragent {
target => "user_agent"
source => "http_user_agent"
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-%{+YYYY-MM-dd}"
user => "elastic"
password => "elastic-password"
}
}

将所有的日志都写入了一个索引,此处优化一下,分项目进行索引分类:

1
2
3
4
5
6
7
8
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-%{[fields][project]}-%{+YYYY-MM-dd}"
user => "elastic"
password => "elastic-password"
}
}

kibana上查看到相对应的索引了,针对其进行创建

其它配置:

EFK模式下根据日志文件中的type输出到不同的索引

此处output还根据条件进行了if else
input {
file {
path => “/usr/local/my.log”
start_position => “beginning”
type => “infolog”
sincedb_path => “/dev/null”
}
file {
path => “/usr/local/my1.log”
start_position => “beginning”
type => “errlog”
sincedb_path => “/dev/null”
}
}

filter {
json {
source => “message”
}
date {
match => [“timestamp”, “dd/MMM/yyyy:HH:mm:ss Z”] #匹配timestamp字段
target => “@timestamp” #将匹配到的数据写到@timestamp字段中
}
}

output {
if [type] == “infolog” {
elasticsearch {
hosts => [“test:9200”]
index => “infolog-%{+YYYY.MM.dd}”
}
} else if [type] == “errlog” {
elasticsearch {
hosts => [“test:9200”]
index => “errlog-%{+YYYY.MM.dd}”
}
}
}

其它写法

1
2
3
4
5
6
7
output {
  elasticsearch{
action => "index"
index => "%{[fields][product_type]}-transaction-%{+YYYY-MM}"
  hosts => ["10.0.xx.xx:9200", "10.0.xx.xx:9200", "10.0.xx.xx:9200"]
}
}

还有在filter中写索引的

    filter {
      if [log_type] in [ "test", "staging" ] {
        mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }
      } else if [log_type] == "production" {
        mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }
      } else {
        mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }
      }
    }
    output {
      elasticsearch {
        index => "%{[@metadata][target_index]}"
      }
    }

关于input到redis中

  1. 日志文件收集input到redis中,redis中的key可以根据项目进行设置,不使用默认的filebeat,是否会对取数据的时候效率更高?
  2. 使用loki系统不失为另外的一种方法