为什么要用ELK
日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因。经常分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误。
通常,日志被分散的储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。日志的统计和检索又成为一件比较麻烦的事情,一般我们使用grep、awk和wc等Linux命令能实现检索和统计,但是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法难免有点力不从心。
官网下载地址:https://www.elastic.co/downloads
- Elasticsearch:是一个基于Apache Lucene(TM)的开源搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,RESTful web风格接口,多数据源,自动搜索负载等。可以用于全文检索、结构化检索和分析。现在使用最广的开源搜索引擎之一,Wikipedia 、StackOverflow、Github 等都基于它来构建自己的搜索引擎。
- Logstash:是一个用来搜集、分析、过滤日志的工具。它支持几乎任何类型的日志,包括系统日志、错误日志和自定义应用程序日志。它可以从许多来源接收日志,这些来源包括 syslog、消息传递(例如 RabbitMQ)和JMX,它能够以多种方式输出数据,包括电子邮件、websockets和 Elasticsearch。
- Kibana:是一个基于Web的图形界面,用于搜索、分析和可视化存储在 Elasticsearch指标中的日志数据。使用它可以很方便的用图表、表格、地图展示和分析数据。
- Filebeat:Filebeat是一个轻量级数据收集引擎,在你的服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读),并且可以转发这些信息到Elasticsearch、Logstash、File、Kafka、Redis 和 Console。
- Kafka:是一种高吞吐量的分布式发布订阅消息系统。在实时数据量比较大的时候,使用kafka作为缓冲是一个不错的选择。
- Nginx:是一个高性能的 Web 和反向代理服务器, 它具有有很多非常优越的特性: 作为 Web 服务器:相比 Apache,Nginx 使用更少的资源,支持更多的并发连接,体现更高的效率,这点使 Nginx 尤其受到虚拟主机提供商的欢迎。能够支持高达 50,000 个并发连接数的响应,感谢 Nginx 为我们选择了 epoll and kqueue 作为开发模型. 作为负载均衡服务器:Nginx 既可以在内部直接支持 Rails 和 PHP,也可以支持作为 HTTP代理服务器 对外进行服务。Nginx 用 C 编写, 不论是系统资源开销还是 CPU 使用效率都比 Perlbal 要好的多。
环境
- 
系统:CentOS 7 es31 –> Elasticsearch: 192.168.11.31 es32 –> Elasticsearch: 192.168.11.32 log33 –> Logstash: 192.168.11.33 log34 –> Logstash: 192.168.11.34 ka35 –> Kafka & Zookeeper: 192.168.11.35 ka36 –> Kafka & Zookeeper: 192.168.11.36 ka37 –> Kafka & Zookeeper: 192.168.11.37 web38–> Kibana & Nginx: 192.168.11.38 
- 
关闭防火墙 systemctl stop firewalld.service && setenforce 0
- 
架构图: nginx日志 » filebeat收集 » kafka队列 » logstash过滤 » elasticsearch filebeat收集nginx日志写入kafka队列中,logstash接收kafka队列的filebeat nginx日志,将读取的日志解析后elasticsearch中。 
安装JDK
yum install java 
java -versionopenjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)
Zookeeper配置
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.14.tar.gz
tar xf zookeeper-3.4.14.tar.gz
mv zookeeper-3.4.14 /usr/local/zookeeper
cd   /usr/local/zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg
#配置日志输出目录
vim /usr/local/zookeeper/bin/zkEnv.sh
    #ZOO_LOG_DIR="."
    ZOO_LOG_DIR="/data/zookeeper/logs/"
ka35、ka36、ka37都修改如下内容
vim config/zookeeper.processors   
ickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
#dataLogDir=/data/zookeeper/logs
maxClientCnxns=0
clientPort=2181
server.1=192.168.11.35:2888:3888  
server.2=192.168.11.36:2888:3888
server.2=192.168.11.37:2888:3888
配置目录及文件 ka35、ka36、ka37 都执行
sudo mkdir -p /data/zookeeper/data
sudo mkdir -p /data/zookeeper/logs
在ka35执行
echo '1' >/data/zookeeper/myid
在ka36执行
echo '2' >/data/zookeeper/myid
在ka37执行
echo '3' >/data/zookeeper/myid
Kafka配置
下载解压
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
tar -xzf kafka_2.12-2.2.0.tgz
mv kafka_2.12-2.2.0 /usr/local/kafka
#ka35 ka36 ka37 配置修改如下内容
vim /usr/local/kafka/config/server.properties
broker.id=1 #ka35配置 1 ka36配置 2 ka37配置 3
delete.topic.enable=true
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=6
replication.factor=2
num.recovery.threads.per.data.dir=1
log.retention.hours=120
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.11.35:2181,192.168.11.36:2181,192.168.11.37:2181
zookeeper.connection.timeout.ms=6000
配置目录
mkdir -p /data/kafka
ka35、ka36、ka37启动
#bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/usr/local/zookeeper/bin/zkServer.sh start
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
kafka 简单操作: 创建topic(创建名为test的topic,只有一个分区和一个备份)
bin/kafka-topics.sh --create --bootstrap-server --replication-factor 1 --partitions 1 --topic test
查看已创建的topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.11.35:9092
手动模拟producer
bin/kafka-console-producer.sh --broker-list 192.168.11.35:9092 --topic test
手动模拟consumer
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.35:9092 --topic test --from-beginning
查看topic详细信息
bin/kafka-topics.sh --describe --bootstrap-server 192.168.11.35:9092 --topic test
查看group的lag等重要信息
bin/kafka-consumer-groups.sh  --bootstrap-server=kafka01:9092 --describe --group=consumer_group01
https://kafka.apache.org/quickstart bin目录下还有很多的脚本工具,可以使用类似“bin/kafka-consumer-groups.sh –help”命令来查看用法
系统参数配置
vim /etc/sysctl.conf
fs.file-max=65536 
vm.max_map_count = 262144 
vim /etc/security/limits.conf
* soft nproc 65535
* hard nproc 65536
* soft nofile 102400
* hard nofile 102400
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
Elasticsearch安装配置
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.2.rpm
rpm -ivh elasticsearch elasticsearch-6.6.2.rpm
参数配置
vim /etc/elasticsearch/elasticsearch.yml
cluster.name: star-cluster
node.name: node-1
path.data: /data/elasticsearch
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: true
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.11.31", "192.168.11.32"]"]
# bigdesk
http.cors.enabled: true
http.cors.allow-origin: "*"
创建数据目录、修改权限
mkdir -pv /data/elasticsearch
chown -R elasticsearch:elasticsearch /data/elasticsearch
重载配置 开机启动 启动
systemctl daemon-reload
systemctl enable elasticsearch.service
systemctl start elasticsearch.service
访问API
curl http://192.168.11.31:9200
curl  http://192.168.11.31:9200/_cluster/health?pretty  # 群集状态
curl  http://192.168.11.31:9200/_cat/nodes?v # 节点状态
Filebeat安装配置
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.2-x86_64.rpm
rpm -ivh elasticsearch filebeat-6.6.2.rpm
vim /etc/filebeat/filebeat.yml 配置
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
  enabled: false
  paths:
    - /var/log/*.log
    
- type: log
  enabled: true
  paths:
    - /data/logs/nginx/*.log
  fields:
    name: nginx-access
  fields_under_root: false
  tail_files: false
#=============================== Processors ===============================
processors:
# 清除掉一些默认的不必要字段(@timestamp和type字段是不能清除的)
- drop_fields:
   fields: ["beat", "input_type", "source", "offset"]
output.kafka:
  enabled: true
  # initial brokers for reading cluster metadata
  hosts: ["192.168.11.35:9092","192.168.11.36:9092","192.168.11.37:9092"]
  # message topic selection + partitioning
  topic: 'nginx-access'
  partition.round_robin:
    reachable_only: true
  # The number of concurrent load-balanced Kafka output workers(default 1) 
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000 # 10MB
#================================ Logging ======================================
# Available log levels are: critical, error, warning, info, debug
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  rotateeverybytes: 52428800 # 50MB
  keepfiles: 5
开机启动
systemctl enable filebeat
systemctl start filebeat
Kiabana安装配置
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.2-x86_64.rpm
rpm -i kibana-6.2.2-x86_64.rpm
systemctl enable logstash.service
systemctl start logstash.service
Logstash安装配置
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.2.rpm 
rpm -i logstash-6.6.2.rpm
参数配置
vim /etc/logstash/logstash.yml 
path.data: /var/lib/logstash
# 最好设置为跟cpu核心数量相同
pipeline.workers: 2
pipeline.output.workers: 2
# Logstash 会攒到 pipeline.batch.size 条数据才一次性发送出去(根据业务的量慎重考虑此值,一般在线上系统需要调整此值,实现最大的吞吐量)
pipeline.batch.size: 20000
# 每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms(此设置一般不需要更改)
pipeline.batch.delay: 5
path.config: /etc/logstash/conf.d
http.host: "192.168.11.33"
log.level: info
path.logs: /var/log/logstash
权限设置
setfacl -m u:logstash:r /var/log/messages /var/log/dmesg
getfacl /var/log/syslog
chown -R logstash.logstash /var/lib/logstash
重载配置 开机启动
systemctl daemon-reload
systemctl enable logstash.service
配置收集nginx日志模式
vim /etc/nginx/nginx.conf 
    log_format  sun  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for" $request_time $upstream_response_time $upstream_addr $upstream_status $upstream_cache_status';
cat /etc/logstash/patterns.d/nginx
NGINX_ACCESS %{IPORHOST:remote_addr} - %{NOTSPACE} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:http_status} (%{NUMBER:response_size}|-) (%{QS:http_referer}|-) (%{QS:user_agent}|-) (%{QS:x_forwarded_for}|-) (%{NUMBER:request_time:float}|-) (%{NUMBER:upstream_response_time:float}|-) (%{HOSTPORT:upstream_addr}|-) (%{NUMBER:upstream_status}|-)
vim /etc/logstash/conf.d/filebeat.conf
input {
#  beats {
#    port => 5044
#  }
   kafka {
       bootstrap_servers => ["192.168.11.35:9092,192.168.11.36:9092,192.168.11.37:9092"] # 注意这里配置的kafka的broker地址不是zk的地址
       group_id => "logstash"
       topics => ["nginx-access"]
       consumer_threads =>5
       decorate_events =>true
   }
}
filter {
  if [fields][name] == "proxy-nginx-access" {
    grok {
        patterns_dir => ["/etc/logstash/patterns.d"]
        match => { "message" => "%{NGINX_ACCESS}"}
        remove_field => [tags, message, offset, source, input_type, "@version","[beat][name]","[beat][version]"] 
     }
    geoip {
       source => "remote_addr"
       target => "geoip"
       database => "/etc/logstash/GeoLite2-City.mmdb"
       add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
       add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
       remove_field => ["[geoip][country_code3]", "[geoip][dma_code]", "[geoip][postal_code]", "[geoip][continent_code]", "[geoip][timezone]", "[geoip][region_code]"]
    }
    if [user_agent] != "-" {
       useragent {
       target => "ua"
       source => "user_agent"
       }
    }
    mutate {
       remove_field => ["host"] 
       convert => [ "[geoip][coordinates]", "float" ]
    }
    #if "_geoip_lookup_failure" in [tags] { drop { } }
  }
}
output {
  if [fields][name] == "nginx-access" or [fields][name] == "proxy-nginx-access" {
      elasticsearch {          
          hosts => ["192.168.1.39:9200"] 
          index => "logstash-%{[fields][name]}"
      }
  } 
  else { 
    elasticsearch {     
          hosts => ["192.168.1.39:9200"] 
          index => "%{[@metadata][beat]}"
      }
  }
  #stdout{ codec => rubydebug }
}
cd /etc/logstatsh/conf.d/
logstash -f flilebeat.conf
性能优化
控制JVM heap size : 系统内存的(少于)一半给Luncene 假设你有一个 64 GB 内存的机器,标准的建议是把 50% 的可用内存作为 Elasticsearch 的堆内存,保留剩下的 50%。当然它也不会被浪费,Lucene 会很乐意利用起余下的内存。 即便你有足够的内存,也尽量不要 超过 32 GB。因为它浪费了内存,降低了 CPU 的性能,还要让 GC 应对大内存。
vim /etc/elasticsearch/jvm.options vim /etc/logstash/jvm.options
-Xms2g
-Xmx2g
禁止或少用Swapping
#swapoff -a
echo vm.swappiness =1 >> /etc/sysctl.conf
sysctl -p
或/etc/elasticsearch/elasticsearch.yml 
bootstrap.memory_lock: true
#启动锁定足够的内存,防止数据写入swap,配合bootstrap.memory_lock: true使用
vim /usr/lib/systemd/system/elasticsearch.service
vim /etc/systemd/system/logstash.service
LimitMEMLOCK=infinity
自动化安装 Puppet https://github.com/elastic/puppet-elasticsearch Chef https://github.com/elastic/cookbook-elasticsearch Ansible https://github.com/elastic/ansible-elasticsearch
https://www.elastic.co/guide/cn/elasticsearch/guide/current/heap-sizing.html#compressed_oops https://segmentfault.com/a/1190000011263254#articleHeader4 Elasticsearch API Elasticsearch 参考指南 探索你的数据 Elasticsearch重要优化