ELK+Filebeat+Kafka 日志分析

Elk Kafka Filebeat

Posted by BlueFat on Tuesday, January 30, 2018

为什么要用ELK

日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因。经常分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误。

通常,日志被分散的储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。日志的统计和检索又成为一件比较麻烦的事情,一般我们使用grep、awk和wc等Linux命令能实现检索和统计,但是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法难免有点力不从心。

官网下载地址:https://www.elastic.co/downloads

  • Elasticsearch:是一个基于Apache Lucene(TM)的开源搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,RESTful web风格接口,多数据源,自动搜索负载等。可以用于全文检索、结构化检索和分析。现在使用最广的开源搜索引擎之一,Wikipedia 、StackOverflow、Github 等都基于它来构建自己的搜索引擎。
  • Logstash:是一个用来搜集、分析、过滤日志的工具。它支持几乎任何类型的日志,包括系统日志、错误日志和自定义应用程序日志。它可以从许多来源接收日志,这些来源包括 syslog、消息传递(例如 RabbitMQ)和JMX,它能够以多种方式输出数据,包括电子邮件、websockets和 Elasticsearch。
  • Kibana:是一个基于Web的图形界面,用于搜索、分析和可视化存储在 Elasticsearch指标中的日志数据。使用它可以很方便的用图表、表格、地图展示和分析数据。
  • Filebeat:Filebeat是一个轻量级数据收集引擎,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读),并且可以转发这些信息到Elasticsearch、Logstash、File、Kafka、Redis 和 Console。
  • Kafka:是一种高吞吐量的分布式发布订阅消息系统。在实时数据量比较大的时候,使用kafka作为缓冲是一个不错的选择。
  • Nginx:是一个高性能的 Web 和反向代理服务器, 它具有有很多非常优越的特性: 作为 Web 服务器:相比 Apache,Nginx 使用更少的资源,支持更多的并发连接,体现更高的效率,这点使 Nginx 尤其受到虚拟主机提供商的欢迎。能够支持高达 50,000 个并发连接数的响应,感谢 Nginx 为我们选择了 epoll and kqueue 作为开发模型. 作为负载均衡服务器:Nginx 既可以在内部直接支持 Rails 和 PHP,也可以支持作为 HTTP代理服务器 对外进行服务。Nginx 用 C 编写, 不论是系统资源开销还是 CPU 使用效率都比 Perlbal 要好的多。

环境

  • 系统:CentOS 7 es31 –> Elasticsearch: 192.168.11.31 es32 –> Elasticsearch: 192.168.11.32 log33 –> Logstash: 192.168.11.33 log34 –> Logstash: 192.168.11.34 ka35 –> Kafka & Zookeeper: 192.168.11.35 ka36 –> Kafka & Zookeeper: 192.168.11.36 ka37 –> Kafka & Zookeeper: 192.168.11.37 web38–> Kibana & Nginx: 192.168.11.38

  • 关闭防火墙

    systemctl stop firewalld.service && setenforce 0
    
  • 架构图: nginx日志 » filebeat收集 » kafka队列 » logstash过滤 » elasticsearch filebeat收集nginx日志写入kafka队列中,logstash接收kafka队列的filebeat nginx日志,将读取的日志解析后elasticsearch中。

安装JDK

yum install java 
java -versionopenjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)

Zookeeper配置

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.14.tar.gz
tar xf zookeeper-3.4.14.tar.gz
mv zookeeper-3.4.14 /usr/local/zookeeper
cd   /usr/local/zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg

#配置日志输出目录
vim /usr/local/zookeeper/bin/zkEnv.sh

    #ZOO_LOG_DIR="."
    ZOO_LOG_DIR="/data/zookeeper/logs/"

ka35、ka36、ka37都修改如下内容

vim config/zookeeper.processors   

ickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
#dataLogDir=/data/zookeeper/logs
maxClientCnxns=0
clientPort=2181
server.1=192.168.11.35:2888:3888  
server.2=192.168.11.36:2888:3888
server.2=192.168.11.37:2888:3888

配置目录及文件 ka35、ka36、ka37 都执行

sudo mkdir -p /data/zookeeper/data
sudo mkdir -p /data/zookeeper/logs

在ka35执行

echo '1' >/data/zookeeper/myid

在ka36执行

echo '2' >/data/zookeeper/myid

在ka37执行

echo '3' >/data/zookeeper/myid

Kafka配置

下载解压

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar -xzf kafka_2.12-2.2.0.tgz
mv kafka_2.12-2.2.0 /usr/local/kafka

#ka35 ka36 ka37 配置修改如下内容

vim /usr/local/kafka/config/server.properties

broker.id=1 #ka35配置 1 ka36配置 2 ka37配置 3
delete.topic.enable=true
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=6
replication.factor=2
num.recovery.threads.per.data.dir=1
log.retention.hours=120
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.11.35:2181,192.168.11.36:2181,192.168.11.37:2181
zookeeper.connection.timeout.ms=6000

配置目录

mkdir -p /data/kafka

ka35、ka36、ka37启动

#bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/usr/local/zookeeper/bin/zkServer.sh start
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

kafka 简单操作: 创建topic(创建名为test的topic,只有一个分区和一个备份)

bin/kafka-topics.sh --create --bootstrap-server --replication-factor 1 --partitions 1 --topic test

查看已创建的topic

bin/kafka-topics.sh --list --bootstrap-server 192.168.11.35:9092

手动模拟producer

bin/kafka-console-producer.sh --broker-list 192.168.11.35:9092 --topic test

手动模拟consumer

bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.35:9092 --topic test --from-beginning

查看topic详细信息

bin/kafka-topics.sh --describe --bootstrap-server 192.168.11.35:9092 --topic test

查看group的lag等重要信息

bin/kafka-consumer-groups.sh  --bootstrap-server=kafka01:9092 --describe --group=consumer_group01

https://kafka.apache.org/quickstart bin目录下还有很多的脚本工具,可以使用类似“bin/kafka-consumer-groups.sh –help”命令来查看用法

系统参数配置

vim /etc/sysctl.conf

fs.file-max=65536 
vm.max_map_count = 262144 

vim /etc/security/limits.conf
* soft nproc 65535
* hard nproc 65536
* soft nofile 102400
* hard nofile 102400
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited

Elasticsearch安装配置

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.2.rpm
rpm -ivh elasticsearch elasticsearch-6.6.2.rpm

参数配置

vim /etc/elasticsearch/elasticsearch.yml

cluster.name: star-cluster
node.name: node-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.11.31", "192.168.11.32"]"]
# bigdesk
http.cors.enabled: true
http.cors.allow-origin: "*"

创建数据目录、修改权限

mkdir -pv /data/elasticsearch
chown -R elasticsearch:elasticsearch /data/elasticsearch

重载配置 开机启动 启动

systemctl daemon-reload
systemctl enable elasticsearch.service
systemctl start elasticsearch.service

访问API

curl http://192.168.11.31:9200
curl  http://192.168.11.31:9200/_cluster/health?pretty  # 群集状态
curl  http://192.168.11.31:9200/_cat/nodes?v # 节点状态

Filebeat安装配置

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.2-x86_64.rpm
rpm -ivh elasticsearch filebeat-6.6.2.rpm
vim /etc/filebeat/filebeat.yml 配置

#=========================== Filebeat inputs =============================

filebeat.inputs:
- type: log
  enabled: false
  paths:
    - /var/log/*.log
    
- type: log
  enabled: true
  paths:
    - /data/logs/nginx/*.log
  fields:
    name: nginx-access
  fields_under_root: false
  tail_files: false
#=============================== Processors ===============================
processors:
# 清除掉一些默认的不必要字段(@timestamp和type字段是不能清除的)
- drop_fields:
   fields: ["beat", "input_type", "source", "offset"]

output.kafka:
  enabled: true
  # initial brokers for reading cluster metadata
  hosts: ["192.168.11.35:9092","192.168.11.36:9092","192.168.11.37:9092"]

  # message topic selection + partitioning
  topic: 'nginx-access'
  partition.round_robin:
    reachable_only: true
  # The number of concurrent load-balanced Kafka output workers(default 1) 
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000 # 10MB

#================================ Logging ======================================
# Available log levels are: critical, error, warning, info, debug
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  rotateeverybytes: 52428800 # 50MB
  keepfiles: 5

开机启动

systemctl enable filebeat
systemctl start filebeat

Kiabana安装配置

wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.2-x86_64.rpm
rpm -i kibana-6.2.2-x86_64.rpm
systemctl enable logstash.service
systemctl start logstash.service

Logstash安装配置

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.2.rpm 
rpm -i logstash-6.6.2.rpm

参数配置

vim /etc/logstash/logstash.yml 

path.data: /var/lib/logstash
# 最好设置为跟cpu核心数量相同
pipeline.workers: 2
pipeline.output.workers: 2
# Logstash 会攒到 pipeline.batch.size 条数据才一次性发送出去(根据业务的量慎重考虑此值,一般在线上系统需要调整此值,实现最大的吞吐量)
pipeline.batch.size: 20000
# 每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms(此设置一般不需要更改)
pipeline.batch.delay: 5
path.config: /etc/logstash/conf.d
http.host: "192.168.11.33"
log.level: info
path.logs: /var/log/logstash

权限设置

setfacl -m u:logstash:r /var/log/messages /var/log/dmesg
getfacl /var/log/syslog
chown -R logstash.logstash /var/lib/logstash

重载配置 开机启动

systemctl daemon-reload
systemctl enable logstash.service

配置收集nginx日志模式

vim /etc/nginx/nginx.conf 

    log_format  sun  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for" $request_time $upstream_response_time $upstream_addr $upstream_status $upstream_cache_status';
cat /etc/logstash/patterns.d/nginx

NGINX_ACCESS %{IPORHOST:remote_addr} - %{NOTSPACE} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:http_status} (%{NUMBER:response_size}|-) (%{QS:http_referer}|-) (%{QS:user_agent}|-) (%{QS:x_forwarded_for}|-) (%{NUMBER:request_time:float}|-) (%{NUMBER:upstream_response_time:float}|-) (%{HOSTPORT:upstream_addr}|-) (%{NUMBER:upstream_status}|-)
vim /etc/logstash/conf.d/filebeat.conf

input {
#  beats {
#    port => 5044
#  }
   kafka {
       bootstrap_servers => ["192.168.11.35:9092,192.168.11.36:9092,192.168.11.37:9092"] # 注意这里配置的kafka的broker地址不是zk的地址
       group_id => "logstash"
       topics => ["nginx-access"]
       consumer_threads =>5
       decorate_events =>true
   }
}

filter {
  if [fields][name] == "proxy-nginx-access" {
    grok {
        patterns_dir => ["/etc/logstash/patterns.d"]
        match => { "message" => "%{NGINX_ACCESS}"}
        remove_field => [tags, message, offset, source, input_type, "@version","[beat][name]","[beat][version]"] 
     }
    geoip {
       source => "remote_addr"
       target => "geoip"
       database => "/etc/logstash/GeoLite2-City.mmdb"
       add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
       add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
       remove_field => ["[geoip][country_code3]", "[geoip][dma_code]", "[geoip][postal_code]", "[geoip][continent_code]", "[geoip][timezone]", "[geoip][region_code]"]
    }
    if [user_agent] != "-" {
       useragent {
       target => "ua"
       source => "user_agent"
       }
    }
    mutate {
       remove_field => ["host"] 
       convert => [ "[geoip][coordinates]", "float" ]
    }
    #if "_geoip_lookup_failure" in [tags] { drop { } }
  }
}

output {
  if [fields][name] == "nginx-access" or [fields][name] == "proxy-nginx-access" {
      elasticsearch {          
          hosts => ["192.168.1.39:9200"] 
          index => "logstash-%{[fields][name]}"
      }
  } 
  else { 
    elasticsearch {     
          hosts => ["192.168.1.39:9200"] 
          index => "%{[@metadata][beat]}"
      }
  }
  #stdout{ codec => rubydebug }
}
cd /etc/logstatsh/conf.d/
logstash -f flilebeat.conf

性能优化

控制JVM heap size : 系统内存的(少于)一半给Luncene 假设你有一个 64 GB 内存的机器,标准的建议是把 50% 的可用内存作为 Elasticsearch 的堆内存,保留剩下的 50%。当然它也不会被浪费,Lucene 会很乐意利用起余下的内存。 即便你有足够的内存,也尽量不要 超过 32 GB。因为它浪费了内存,降低了 CPU 的性能,还要让 GC 应对大内存。

vim /etc/elasticsearch/jvm.options vim /etc/logstash/jvm.options

-Xms2g
-Xmx2g

禁止或少用Swapping

#swapoff -a
echo vm.swappiness =1 >> /etc/sysctl.conf
sysctl -p

或/etc/elasticsearch/elasticsearch.yml 
bootstrap.memory_lock: true
#启动锁定足够的内存,防止数据写入swap,配合bootstrap.memory_lock: true使用
vim /usr/lib/systemd/system/elasticsearch.service
vim /etc/systemd/system/logstash.service

LimitMEMLOCK=infinity

自动化安装 Puppet https://github.com/elastic/puppet-elasticsearch Chef https://github.com/elastic/cookbook-elasticsearch Ansible https://github.com/elastic/ansible-elasticsearch

https://www.elastic.co/guide/cn/elasticsearch/guide/current/heap-sizing.html#compressed_oops https://segmentfault.com/a/1190000011263254#articleHeader4 Elasticsearch API Elasticsearch 参考指南 探索你的数据 Elasticsearch重要优化