写在前面
在 ELK 架构中,Logstash 扮演着“数据管道”的角色。它负责从各种来源(如文件、数据库、消息队列、网络接口等)采集日志数据,然后通过强大的过滤器对数据进行清洗、解析、结构化处理,最后将处理后的日志发送到 Elasticsearch 中进行存储与索引。可以说,Logstash 是日志从“混乱无序”到“可搜索、有结构”的关键环节。
1. 安装部署
获取软件包
如果有限制的小伙伴,可以使用我下载好的包:logstash-7.17.5
--- 1. 上传软件包至服务器上
--- 2. 解压
tar xf logstash-7.17.5-linux-x86_64.tar.gz -C /softwares/
--- 3. 创建软连接
ln -s /software/logstash-7.17.5/bin/logstash /usr/local/sbin/
--- 4. 验证 出现信息表示成功
logstash -V
2. 轻度体验logstash
1. input-output插件
--- 1. 编写配置文件
## cat 01-stdin-to-stout.conf
input {
stdin { type => stdin }
}
output {
stdout {}
}
--- 2. 启动logstash
logstash -f config/01-stdin-to-stdout.conf
2. logstash 搭配filebeat
--- 1. 编写logstash 配置文件
## cat 02-beats-to-stdout.conf
input {
## 指定输入的类型是以恶搞beats
beats {
## 指定监听的端口号
port => 8888
}
}
output {
stdout {}
# 将数据写入ES集群
elasticsearch {
# 指定ES主机地址
hosts => ["http://192.168.0.160:9200"]
# 指定索引名称
index => "nginx-filebeat-logstash"
}
}
--- 2. 热加载logstash
logstash -rf 02-beats-to-stdout.conf
--- 3 . 启动filebeat 实例并写入数据
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
# 将数据输出到logstash中
output.logstash:
## 指定logstash的主机和端口
hosts: ["192.168.0.160:8888"]
--- 4. 启动filebeat
filebeat -e -c nginx-to-logstash.yml
3. logstash的过滤插件之geoip
--- 1. 编写配置文件
## cat 03-geoip-logstash.conf
input {
beats {
port => 8888
}
}
filter {
## 根据IP地址分析客户端的经纬度,国家,城市信息等
geoip {
## 源字段,就是要geoip分析的字段
source => "clientip"
## 可以把无用的字段移除
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
}
output {
## 将数据在标准输出显示,用于测试和调试
stdout {}
## 调试成功后可以把数据向es集群中打
elasticsearch {
hosts => ["http://localhost:9200"]
index => "geoip-logstash"
}
}
--- 2. 热加载logstash
--- 3. filebeat采集数据到logstash
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
## 将日志输出为json格式
json.keys_under_root: true
json.add_error_key: true
output.logstash:
hosts: ["192.168.0.160:8888"]
--- 4. 启动filebeat实例
--- 5. 日志格式大致为这样子的,后面logstash会把clientip给解析出来
{"@timestamp":"2025-08-13T17:13:50+08:00","host":"192.168.0.160","clientip":"101.243.152.111","SendBytes":409,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.0.160","uri":"/index.nginx-debian.html","domain":"192.168.0.160","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36","status":"200"}
{"@timestamp":"2025-08-13T17:13:50+08:00","host":"192.168.0.160","clientip":"139.196.243.11","SendBytes":196,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.0.160","uri":"/favicon.ico","domain":"192.168.0.160","xff":"-","referer":"http://192.168.0.160/","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36","status":"404"}
4. logstash的过滤插件之grok解析nginx原生日志
--- 1. logstash配置文件编写
input {
beats {
port => 8888
}
}
filter {
## 使用grok 匹配nginx的原生日志格式
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
## 移除没用的字段
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
## 使用geoip解析ip地址
geoip {
source => "clientip"
}
}
output {
stdout {}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "grok-logstash"
}
}
--- 2. 启动logstash实例
--- 3. filebeat 采集日志
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
output.logstash:
hosts: ["192.168.0.160:8888"]
--- 4. 启动filebeat实例
--- 5. nginx原生日志格式,注意如果你修改了nginx的日志格式可能会导致解析失败
192.168.0.79 - - [14/Aug/2025:14:05:26 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36"
192.168.0.79 - - [14/Aug/2025:14:05:26 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36"
5. logstash解析将实际写入时间更正
--- 1. logstash配置文件编写
input {
beats {
port => 8888
}
}
filter {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
}
date {
## 匹配时间字符串字段并格式化
match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss Z" ]
## 匹配时区
timezone => "Asia/Shanghai"
# 将转后的日期替换为指定字段,若不指定,则默认值为"@timestamp"
target => "linux-date"
}
}
output {
stdout {}
elasticsearch {
hosts => ["http://192.168.0.160:9200"]
index => "data-logstash"
}
}
--- 2. 启动logstash实例
--- 3. 编写filebeat配置文件
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
output.logstash:
hosts: ["192.168.0.160:8888"]
--- 4. 启动filebeat实例
3. 深度体验logstash
1. grok 自定义正则并使用分支判断
--- 1. 创建匹配目录
mkdir -p /software/logstash-7.17.5/con/patterns
--- 2. 创建匹配文件,这里可能需要你熟练正则表达式
cat patterns/test
YEAR [\d]{4}
AGE [0-9]{2}
NAME [A-Za-z]+
--- 3. 编写配置文件
input {
beats {
port => 8888
## 这里使用type 来定义数据类型用于后面的分支判断
type => "beats"
}
tcp {
port => 9999
type => "tcp"
}
http {
type => "http"
}
}
filter {
if [type] == "beats" {
grok {
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
}
if [type] == "tcp" {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/software/logstash-7.17.5/con/patterns"]
# 基于指定字段进行匹配,这里的NAME表示上面你创建的自定义匹配规则,name是要显示的字段,\s* 匹配多个空格
match => { "message" => "%{NAME:name}%{YEAR:year}\s*年龄%{AGE:age}" }
## 增加一个自定义的字段
add_field => { "custom-type" => "I love linux" }
}
}else {
mutate {
add_field => {
"school" => "NUAA"
"study" => "es"
"custom-type" => "this is http"
}
}
}
}
output {
stdout {}
## 这里同样可以用if判断来将数据打入不同的集群或者不同的索引中
if [type] == "beats" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "grok-beats-es"
}
}
if [type] == "tcp" {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "grok-tcp-es"
}
}else {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "grok-http-es"
}
}
--- 4. 使用测试数据
echo XINGZHIBANG2025 年龄25 | nc 192.168.0.160 9999
--- 5. 结果展示,这里的year,name,和age就都匹配出来了。生产数据需要你灵活修改哦,提取出来的字段就可以通过kibana展示了
{
"year" => "2025",
"@timestamp" => 2025-08-21T03:45:45.977Z,
"host" => "elk01",
"port" => 33296,
"type" => "tcp",
"@version" => "1",
"custom-type" => "I love linux"
"name" => "XINGZHIBANG",
"age" => "25",
"message" => "XINGZHIBANG2025 年龄25"
}
2. logstash的多pipline
### 当我们在业务逻辑相对负载的时候,除了可以使用多分支语句来解决,我们也可以使用pipline来实现
## 准备文件,我这里就是将上述三中类型分别拆分到单个文件中
--- 1. 01-pipline-beats.conf
input {
beats {
port => 8888
type => "beats"
}
}
filter {
grok {
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "grok-beats-es"
}
}
--- 2. 02-pipline-tcp.conf
input {
tcp {
port => 9999
type => "tcp"
}
}
filter {
grok {
patterns_dir => ["/software/logstash-7.17.5/con/patterns"]
match => { "message" => "%{NAME:name}%{YEAR:year}\s*年龄%{AGE:age}" }
add_field => { "custom-type" => "I love linux" }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "grok-tcp-es"
}
}
--- 3. 03-pipline-http.conf
input {
http {
type => "http"
}
}
filter {
mutate {
add_field => {
"school" => "NUAA"
"study" => "es"
"custom-type" => "this is http"
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "grok-http-es"
}
--- 4. 编写配置文件
vim /software/logstash-7.17.15/config/pipelines.yml
- pipeline.id: pipline-beats
path.config: "/software/logstash-7.17.15/con/01-pipline-beats.conf"
- pipeline.id: pipline-tcp
path.config: "/software/logstash-7.17.15/con/01-pipline-tcp.conf"
- pipeline.id: pipline-http
path.config: "/software/logstash-7.17.15/con/01-pipline-http.conf"
--- 5. 启动logstash
logstash
3. logstash的useragent过滤器
### 在 Logstash 里,useragent 过滤器的作用就是:把原始的 User-Agent 字符串解析成更容易理解和使用的结构化字段。常用于识别设备类型,识别操作系统,识别浏览器以便于做数据分析
--- 1. logstash的配置文件
input {
beats {
port => 8888
}
}
filter {
grok {
match => {
"message" => '%{IPORHOST:clientip} - - \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} (?:%{NUMBER:bytes}|-) "%{DATA:referrer}" "%{GREEDYDATA:http_user_agent}" %{NUMBER:request_time:float}'
}
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
timezone => "Asia/Shanghai"
target => "@timestamp"
}
useragent {
## 指定基于哪个字段分析设备
source => "http_user_agent"
## 指定将解析的数据放在哪个字段,若不指定,则默认放在顶级字段中
target => "ua"
}
## 把 os 直接放到顶层字段,方便 Kibana 聚合或过滤
mutate {
add_field => {
"os" => "%{[ua][os]}"
"os_name" => "%{[ua][os_name]}"
}
}
}
output {
# stdout {}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "linux-multiple_instance-beats"
}
}
评论