写在前面

在 ELK 架构中,Logstash 扮演着“数据管道”的角色。它负责从各种来源(如文件、数据库、消息队列、网络接口等)采集日志数据,然后通过强大的过滤器对数据进行清洗、解析、结构化处理,最后将处理后的日志发送到 Elasticsearch 中进行存储与索引。可以说,Logstash 是日志从“混乱无序”到“可搜索、有结构”的关键环节。

1. 安装部署

获取软件包

如果有限制的小伙伴,可以使用我下载好的包:logstash-7.17.5

https://www.elastic.co/downloads/elasticsearch
--- 1. 上传软件包至服务器上
--- 2. 解压
tar xf logstash-7.17.5-linux-x86_64.tar.gz -C /softwares/
--- 3. 创建软连接
ln -s /software/logstash-7.17.5/bin/logstash   /usr/local/sbin/  
--- 4. 验证 出现信息表示成功
logstash -V

2. 轻度体验logstash

https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-beats.html

1. input-output插件

--- 1. 编写配置文件   
## cat 01-stdin-to-stout.conf 
input { 
  stdin { type => stdin } 
} 
output { 
  stdout {}
}
--- 2. 启动logstash
logstash -f config/01-stdin-to-stdout.conf

2. logstash 搭配filebeat

--- 1. 编写logstash 配置文件
## cat 02-beats-to-stdout.conf 
input {
  ## 指定输入的类型是以恶搞beats
  beats {
    ## 指定监听的端口号
    port => 8888
  }
}
output {
  stdout {}

   # 将数据写入ES集群
  elasticsearch {
    # 指定ES主机地址
    hosts => ["http://192.168.0.160:9200"]
    # 指定索引名称
    index => "nginx-filebeat-logstash"
  }
}
--- 2. 热加载logstash
logstash -rf 02-beats-to-stdout.conf
--- 3 . 启动filebeat 实例并写入数据
filebeat.inputs:
- type: log 
  paths:
    - /var/log/nginx/access.log
# 将数据输出到logstash中
output.logstash:
  ## 指定logstash的主机和端口
  hosts: ["192.168.0.160:8888"]
--- 4. 启动filebeat
filebeat -e -c nginx-to-logstash.yml 

3. logstash的过滤插件之geoip

--- 1. 编写配置文件
## cat 03-geoip-logstash.conf
input { 
  beats {
    port => 8888
  }
} 
filter {
  ## 根据IP地址分析客户端的经纬度,国家,城市信息等
  geoip {
     ## 源字段,就是要geoip分析的字段
     source => "clientip"
     ## 可以把无用的字段移除
     remove_field => [ "agent","log","input","host","ecs","tags" ]
  }
}
output { 
  ## 将数据在标准输出显示,用于测试和调试
  stdout {} 
  ## 调试成功后可以把数据向es集群中打 
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "geoip-logstash"
  }
}
--- 2. 热加载logstash
--- 3. filebeat采集数据到logstash
filebeat.inputs:
- type: log 
  paths:
    - /var/log/nginx/access.log
  ## 将日志输出为json格式
  json.keys_under_root: true
  json.add_error_key: true
output.logstash:
  hosts: ["192.168.0.160:8888"]
--- 4. 启动filebeat实例
--- 5. 日志格式大致为这样子的,后面logstash会把clientip给解析出来
{"@timestamp":"2025-08-13T17:13:50+08:00","host":"192.168.0.160","clientip":"101.243.152.111","SendBytes":409,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.0.160","uri":"/index.nginx-debian.html","domain":"192.168.0.160","xff":"-","referer":"-","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36","status":"200"}
{"@timestamp":"2025-08-13T17:13:50+08:00","host":"192.168.0.160","clientip":"139.196.243.11","SendBytes":196,"responsetime":0.000,"upstreamtime":"-","upstreamhost":"-","http_host":"192.168.0.160","uri":"/favicon.ico","domain":"192.168.0.160","xff":"-","referer":"http://192.168.0.160/","tcp_xff":"-","http_user_agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36","status":"404"}

4. logstash的过滤插件之grok解析nginx原生日志

--- 1. logstash配置文件编写
input { 
  beats {
    port => 8888
  }
}
filter {
 ## 使用grok 匹配nginx的原生日志格式
 grok {
  match => { "message" => "%{HTTPD_COMBINEDLOG}" }
 ## 移除没用的字段
  remove_field => [ "agent","log","input","host","ecs","tags" ]
 }
 ## 使用geoip解析ip地址
 geoip {
   source => "clientip"
  }
}
output { 
  stdout {} 
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "grok-logstash"

  }
}

--- 2. 启动logstash实例

--- 3. filebeat 采集日志
filebeat.inputs:
- type: log 
  paths:
    - /var/log/nginx/access.log
output.logstash:
  hosts: ["192.168.0.160:8888"]

--- 4. 启动filebeat实例
--- 5. nginx原生日志格式,注意如果你修改了nginx的日志格式可能会导致解析失败
192.168.0.79 - - [14/Aug/2025:14:05:26 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36"
192.168.0.79 - - [14/Aug/2025:14:05:26 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36"

5. logstash解析将实际写入时间更正

--- 1. logstash配置文件编写
input { 
  beats {
    port => 8888
  }
}
filter {
 grok {
  match => { "message" => "%{HTTPD_COMBINEDLOG}" }
  remove_field => [ "agent","log","input","host","ecs","tags" ]
 }
 geoip {
   source => "clientip"
  }
 date {
   ## 匹配时间字符串字段并格式化
   match => [ "timestamp","dd/MMM/yyyy:HH:mm:ss  Z" ] 
   ## 匹配时区
   timezone => "Asia/Shanghai"
   # 将转后的日期替换为指定字段,若不指定,则默认值为"@timestamp"
   target => "linux-date"
 }
}
output { 
  stdout {} 
  elasticsearch {
    hosts => ["http://192.168.0.160:9200"]
    index => "data-logstash"
  }
}
--- 2. 启动logstash实例
--- 3. 编写filebeat配置文件
filebeat.inputs:
- type: log 
  paths:
    - /var/log/nginx/access.log
output.logstash:
  hosts: ["192.168.0.160:8888"]
--- 4. 启动filebeat实例

3. 深度体验logstash

1. grok 自定义正则并使用分支判断

--- 1. 创建匹配目录
mkdir -p /software/logstash-7.17.5/con/patterns
--- 2. 创建匹配文件,这里可能需要你熟练正则表达式
cat patterns/test
YEAR [\d]{4}
AGE [0-9]{2}
NAME [A-Za-z]+

--- 3. 编写配置文件
input { 
  beats {
    port => 8888
    ## 这里使用type 来定义数据类型用于后面的分支判断 
    type => "beats"
  }
  tcp {
    port => 9999
    type => "tcp"
  }
  http {
    type => "http"
  }
}
filter {
  if [type] == "beats" {
      grok {     
         remove_field => [ "agent","log","input","host","ecs","tags" ]
      }
  }
  if [type] == "tcp" {
     grok {
         # 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
         patterns_dir => ["/software/logstash-7.17.5/con/patterns"]
         # 基于指定字段进行匹配,这里的NAME表示上面你创建的自定义匹配规则,name是要显示的字段,\s* 匹配多个空格
         match => { "message" => "%{NAME:name}%{YEAR:year}\s*年龄%{AGE:age}" }
         ## 增加一个自定义的字段
         add_field => { "custom-type" => "I love linux" }   
     }
  }else {
    mutate {
      add_field => {
        "school" => "NUAA"
        "study"  => "es"
        "custom-type" => "this is http"
      }
    }
  }
}
output { 
 stdout {}
 ## 这里同样可以用if判断来将数据打入不同的集群或者不同的索引中
 if [type] == "beats" {
   elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "grok-beats-es"
   }
 }
 if [type] == "tcp" {
   elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "grok-tcp-es"
   }
 }else {
   elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "grok-http-es"
  } 
}
--- 4. 使用测试数据
echo XINGZHIBANG2025 年龄25 | nc 192.168.0.160 9999
--- 5. 结果展示,这里的year,name,和age就都匹配出来了。生产数据需要你灵活修改哦,提取出来的字段就可以通过kibana展示了
{
          "year" => "2025",
    "@timestamp" => 2025-08-21T03:45:45.977Z,
          "host" => "elk01",
          "port" => 33296,
          "type" => "tcp",
      "@version" => "1",
   "custom-type" => "I love linux"
          "name" => "XINGZHIBANG",
           "age" => "25",
       "message" => "XINGZHIBANG2025 年龄25"
}

2. logstash的多pipline

### 当我们在业务逻辑相对负载的时候,除了可以使用多分支语句来解决,我们也可以使用pipline来实现
## 准备文件,我这里就是将上述三中类型分别拆分到单个文件中
--- 1. 01-pipline-beats.conf
input { 
  beats {
    port => 8888
    type => "beats"
  }
}
filter {
    grok {     
       remove_field => [ "agent","log","input","host","ecs","tags" ]
    }
}
output { 
   elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "grok-beats-es"
   }
}

--- 2. 02-pipline-tcp.conf
input { 
  tcp {
    port => 9999
    type => "tcp"
  }
}
filter {
  grok {    
     patterns_dir => ["/software/logstash-7.17.5/con/patterns"]
     match => { "message" => "%{NAME:name}%{YEAR:year}\s*年龄%{AGE:age}" }
     add_field => { "custom-type" => "I love linux" }   
   }
}
output { 
   elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "grok-tcp-es"
   }
}
--- 3. 03-pipline-http.conf
input { 
  http {
    type => "http"
  }
}
filter {
  mutate {
     add_field => {
        "school" => "NUAA"
        "study"  => "es"
        "custom-type" => "this is http"
      }
    }
}
output { 
 elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "grok-http-es"
}

--- 4. 编写配置文件
vim /software/logstash-7.17.15/config/pipelines.yml
- pipeline.id: pipline-beats  
  path.config: "/software/logstash-7.17.15/con/01-pipline-beats.conf"
- pipeline.id: pipline-tcp  
  path.config: "/software/logstash-7.17.15/con/01-pipline-tcp.conf"
- pipeline.id: pipline-http  
  path.config: "/software/logstash-7.17.15/con/01-pipline-http.conf"
--- 5. 启动logstash
logstash

3. logstash的useragent过滤器

### 在 Logstash 里,useragent 过滤器的作用就是:把原始的 User-Agent 字符串解析成更容易理解和使用的结构化字段。常用于识别设备类型,识别操作系统,识别浏览器以便于做数据分析
--- 1. logstash的配置文件
input {
  beats {
    port => 8888
  } 
}
filter {
  grok {
    match => {
      "message" => '%{IPORHOST:clientip} - - \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} (?:%{NUMBER:bytes}|-) "%{DATA:referrer}" "%{GREEDYDATA:http_user_agent}" %{NUMBER:request_time:float}'
    }
  }
  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    timezone => "Asia/Shanghai"
    target => "@timestamp"
  }
  useragent {
    ## 指定基于哪个字段分析设备
    source => "http_user_agent" 
    ## 指定将解析的数据放在哪个字段,若不指定,则默认放在顶级字段中 
    target => "ua"
  }

 ## 把 os 直接放到顶层字段,方便 Kibana 聚合或过滤
  mutate {
    add_field => {
      "os" => "%{[ua][os]}"
      "os_name" => "%{[ua][os_name]}"
    }
  }
}
output { 
# stdout {} 
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "linux-multiple_instance-beats"
  }
}