• 生活的道路一旦选定,就要勇敢地走到底,决不回头。——左拉
  • 坚强的信心,能使平凡的人做出惊人的事业。——马尔顿
  • 人不可有傲气,但不可无傲骨。 --徐悲鸿
  • 古之立大志者,不惟有超世之才,亦必有坚韧不拔之志。 --苏轼
  • 时间像海绵里的水,只要你愿意挤,总还是有的。 --鲁迅

kafka+ELKF集群搭建

DevOps zkinogg 8个月前 (08-18) 268次浏览 0个评论

ELK+Kafka集群搭建

环境准备

主机名 ip 端口 服务名
m01 10.0.0.61 ansible服务端
db01 10.0.0.51 2181.9092 zookeeper,kafka
db02 10.0.0.52 2181.9092.2888 zookeeper,kafka
db03 10.0.0.53 2181.9092.3888 zookeeper,kafka
db04 10.0.0.54 9200.9300.5601 elasticsearch,kibana
db05 10.0.0.55 9200.9300.9600 elasticsearch,logstash
db06 10.0.0.56 9200.9300 elasticsearch
web01 10.0.0.7 80.8080 filebeat,nginx,tomcat

架构思路图

duomwD.png

ansible搭建ELK并测试

这里一定记得内存调2G!配置时间同步 !!

具体步骤

# 0.发送密钥
[root@m01 ~]# ssh-keygen
[root@m01 ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub root@172.16.1.54
[root@m01 ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub root@172.16.1.55
[root@m01 ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub root@172.16.1.56
[root@m01 ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub root@172.16.1.7

# 1.准备elasticsearch的roles目录
[root@m01 ~]# mkdir ansible_elasticsearch
[root@m01 ~]# cd ansible_elasticsearch/
[root@m01 ansible_elasticsearch]# ansible-galaxy init elasticsearch

# 2.准备hosts文件
[root@m01 ansible_elasticsearch]# cat hosts 
[db_group]
db04 ansible_ssh_host=172.16.1.54
db05 ansible_ssh_host=172.16.1.55
db06 ansible_ssh_host=172.16.1.56
[web_group]
web01 ansible_ssh_host=172.16.1.7

# 3.准备site.yml文件
[root@m01 ansible_elasticsearch]# cat site.yml 
- hosts: all
  roles:
    - { role: elasticsearch , when: (ansible_fqdn is match 'db*') or (ansible_fqdn is match 'web*')}
    
# 4.用jinjia模板准备elasticsearch的配置文件
[root@m01 elasticsearch]# cat templates/elasticsearch.yml.j2 
cluster.name: es-cluster
path.data: /service/es/data
path.logs: /service/es/logs
bootstrap.memory_lock: true
{% if ansible_fqdn == 'db04' %}
node.name: node-1
network.host: 10.0.0.54,127.0.0.1
{% elif ansible_fqdn == 'db05' %}
node.name: node-2
network.host: 10.0.0.55,127.0.0.1
{% else %}
node.name: node-3
network.host: 10.0.0.56,127.0.0.1
{% endif %}
http.port: 9200
discovery.zen.ping.unicast.hosts: ["10.0.0.54", "10.0.0.55","10.0.0.56"]
discovery.zen.minimum_master_nodes: 2


# 5.准备文件
[root@m01 files]# ll
total 635780
-rw-r--r-- 1 root root  11100954 Aug 14 09:13 apache-tomcat-10.0.0-M7.tar.gz # tomcat包
-rw-r--r-- 1 root root 114059630 Aug  9 18:54 elasticsearch-6.6.0.rpm # Es安装包
-rw-r--r-- 1 root root      1703 Aug 11 15:17 elasticsearch.service  # es启动脚本
-rw-r--r-- 1 root root 170023183 Aug  9 18:54 jdk-8u181-linux-x64.rpm  # java环境包
-rw-r--r-- 1 root root 185123116 Aug  9 18:54 kibana-6.6.0-x86_64.rpm  # kibana安装包
-rw-r--r-- 1 root root       190 Aug 11 15:47 kibana.yml            # kibana配置文件
-rw-r--r-- 1 root root 170703770 Aug  9 18:54 logstash-6.6.0.rpm    # logstash安装包
-rw-r--r-- 1 root root        43 Aug 14 22:32 logstash.sh           # logstash环境变量文件
-rw-r--r-- 1 root root       366 Aug 15 01:46 nginx_tomcat.conf     # logstash收集配置文件

##  kibana配置文件
[root@m01 elasticsearch]# cat files/kibana.yml 
#进程的端口
server.port: 5601
#监听地址
server.host: "10.0.0.54"
#指定ES的地址
elasticsearch.hosts: ["http://10.0.0.54:9200"]
#kibana也会创建索引
kibana.index: ".kibana"


## Es进程启动文件
[root@m01 elasticsearch]# cat files/elasticsearch.service 
[Unit]
Description=Elasticsearch
Documentation=http://www.elastic.co
Wants=network-online.target
After=network-online.target

[Service]
RuntimeDirectory=elasticsearch
PrivateTmp=true
Environment=ES_HOME=/usr/share/elasticsearch
Environment=ES_PATH_CONF=/etc/elasticsearch
Environment=PID_DIR=/var/run/elasticsearch
EnvironmentFile=-/etc/sysconfig/elasticsearch
LimitMEMLOCK=infinity

WorkingDirectory=/usr/share/elasticsearch

User=elasticsearch
Group=elasticsearch

ExecStart=/usr/share/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet

# StandardOutput is configured to redirect to journalctl since
# some error messages may be logged in standard output before
# elasticsearch logging system is initialized. Elasticsearch
# stores its logs in /var/log/elasticsearch and does not use
# journalctl by default. If you also want to enable journalctl
# logging, you can simply remove the "quiet" option from ExecStart.
StandardOutput=journal
StandardError=inherit

# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65536

# Specifies the maximum number of processes
LimitNPROC=4096

# Specifies the maximum size of virtual memory
LimitAS=infinity

# Specifies the maximum file size
LimitFSIZE=infinity

# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0

# SIGTERM signal is used to stop the Java process
KillSignal=SIGTERM

# Send the signal only to the JVM rather than its control group
KillMode=process

# Java process is never killed
SendSIGKILL=no

# When a JVM receives a SIGTERM signal it exits with code 143
SuccessExitStatus=143

[Install]
WantedBy=multi-user.target

# Built for packages-6.6.0 (packages)


##  logstash收集配置文件
[root@m01 files]# cat nginx_tomcat.conf 
input {
  file {
    type => "tomcat_log"
    path => "/usr/local/tomcat/logs/localhost_access_log.*.txt"
    start_position => "beginning"
  }
  file {
    type => "nginx_log"
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  }
}
output {
    elasticsearch {
      hosts => ["10.0.0.54:9200"]
      index => "%{type}_%{+YYYY-MM-dd}"
  }
}


## logstash环境变量文件
[root@m01 files]# cat logstash.sh 
export PATH=/usr/share/logstash/bin/:$PATH


## 7.编写ELK的tasks
[root@m01 tasks]# ll
total 20
-rw-r--r-- 1 root root  147 Aug 14 23:03 main.yml
-rw-r--r-- 1 root root 2071 Aug 15 01:45 Yum_ES.yml  # 安装配置并启动ES
-rw-r--r-- 1 root root  503 Aug 11 16:19 Yum_kibana.yml  # 安装配置并启动kibana
-rw-r--r-- 1 root root  935 Aug 15 00:56 Yum_logstash.yml  # 安装配置并启动logstash
-rw-r--r-- 1 root root  621 Aug 15 00:26 Yum_Nginx_Tomcat.yml  # 安装配置并启动nginx和tomcat

[root@m01 tasks]# cat main.yml 
---
# tasks file for elasticsearch
- include : Yum_ES.yml          
- include : Yum_kibana.yml
- include : Yum_Nginx_Tomcat.yml
- include : Yum_logstash.yml

[root@m01 tasks]# cat Yum_ES.yml 
- name: Yum ntpdate                 # 安装时间同步服务
  yum:
    name: ntpdate
    state: present
  when: (ansible_fqdn is match 'db*') or (ansible_fqdn is match 'web*')

- name: ntpdate aliyun time         # 同步阿里云时间
  shell: 'ntpdate time1.aliyun.com'
  when: (ansible_fqdn is match 'db*') or (ansible_fqdn is match 'web*')

- name: Push java rpm               # 推java环境包
  copy:
    src: jdk-8u181-linux-x64.rpm
    dest: /root/jdk-8u181-linux-x64.rpm
  when: (ansible_fqdn is match 'db*') or (ansible_fqdn is match 'web*')

- name: Yum java rpm                # 安装java环境包
  yum:
   name: /root/jdk-8u181-linux-x64.rpm
   state: present
  when: (ansible_fqdn is match 'db*') or (ansible_fqdn is match 'web*')

- name: Source Profile              # 刷新环境变量
  shell: 'source /etc/profile'
  when: (ansible_fqdn is match 'db*') or (ansible_fqdn is match 'web*')

- name: Push elasticsearch rpm      # 推Es安装包
  copy:
    src: elasticsearch-6.6.0.rpm
    dest: /root/elasticsearch-6.6.0.rpm
  when: ansible_fqdn is match 'db*'

- name: Yum elasticsearch rpm       # 安装Es
  yum:
    name: /root/elasticsearch-6.6.0.rpm
    state: present
  when: ansible_fqdn is match 'db*'

- name: reload systemd              # 根据提示重启systemcd
  shell: 'systemctl daemon-reload'
  when: ansible_fqdn is match 'db*'

- name: service start               # 根据提示启动Es并添加开机自启
  service:
     name: elasticsearch.service
     state: started
     enabled: yes
  when: ansible_fqdn is match 'db*'

- name: Push elasticsearch config       # 推Es配置文件
  template:
    src: elasticsearch.yml.j2
    dest: /etc/elasticsearch/elasticsearch.yml
  when: ansible_fqdn is match 'db*'

- name: mkdir config directory          # 根据配置文件创建目录并授权
  file:
    path: /service/es/{{ item }}
    owner: elasticsearch
    group: elasticsearch
    state: directory
    recurse: yes
  with_items: 
    - data
    - logs
  when: ansible_fqdn is match 'db*'

- name: Push systemd start config       # 推systemd启动文件
  copy:
    src: elasticsearch.service
    dest: /usr/lib/systemd/system/elasticsearch.service
    mode: 0755
  when: ansible_fqdn is match 'db*'

- name: reload systemd                  # 重启systemd
  shell: 'systemctl daemon-reload'
  when: ansible_fqdn is match 'db*'

- name: service start                       # 开启Es
  service: 
     name: elasticsearch.service
     state: restarted
     enabled: yes
  when: ansible_fqdn is match 'db*'




[root@m01 tasks]# cat Yum_kibana.yml 
- name: Push kibana rpm                     # 推kibana安装 包
  copy:
    src: kibana-6.6.0-x86_64.rpm
    dest: /root/kibana-6.6.0-x86_64.rpm
  when: ansible_fqdn == 'db04'

- name: Yum kibana rpm                      # 安装kibana
  yum:
    name: /root/kibana-6.6.0-x86_64.rpm
    state: present
  when: ansible_fqdn == 'db04'

- name: Push kibana config                  # 推kibana配置文件
  copy:
    src: kibana.yml
    dest: /etc/kibana/kibana.yml
  when: ansible_fqdn == 'db04'

- name: start kibana                        # 启动kibana
  service:
    name: kibana.service
    state: started
    enabled: yes
  when: ansible_fqdn == 'db04'
  
  
  
  
  
[root@m01 tasks]# cat Yum_Nginx_Tomcat.yml 
- name: Unarchive tomcat packages               # 解压tomcat包到web01的/usr/local下
  unarchive: 
    src: apache-tomcat-10.0.0-M7.tar.gz
    dest: /usr/local/
  when: ansible_fqdn == 'web01'

- name: link tomcat                             # 创建软链接
  file:
    path: /usr/local/tomcat
    src: /usr/local/apache-tomcat-10.0.0-M7
    state: link
  when: ansible_fqdn == 'web01'

- name: start tomcat                    # 启动tomcat (这里一定要写原文件绝对路径并且nohup不加&)
  shell: 'nohup /usr/local/apache-tomcat-10.0.0-M7/bin/startup.sh'
  when: ansible_fqdn == 'web01'

- name: Yum nginx                       # 安装nginx
  yum:
    name: nginx
    state: present
  when: ansible_fqdn == 'web01'

- name: Start nginx                     # 启动nginx
  service:
    name: nginx
    state: started
    enabled: yes
  when: ansible_fqdn == 'web01'
  
  
  
  
  
  
  [root@m01 tasks]# cat Yum_logstash.yml 
- name: Push logstash Packages          # 推logstash安装包
  copy:
    src: logstash-6.6.0.rpm
    dest: /root/logstash-6.6.0.rpm
  when: ansible_fqdn == 'web01'

- name: Yum logstash                    # 安装lgostash
  yum:
    name: /root/logstash-6.6.0.rpm
    state: present
  when: ansible_fqdn == 'web01'

- name: Grant directory                 # 递归授权目录
  file:
    path: /usr/share/logstash/
    owner: logstash
    group: logstash
    state: directory
    recurse: yes
  when: ansible_fqdn == 'web01'

- name: Push logstash config                # 推logstash收集日志配置文件
  copy: 
    src: nginx_tomcat.conf
    dest: /etc/logstash/conf.d/nginx_tomcat.conf
  when: ansible_fqdn == 'web01'

- name: Push logstash sh                    # 推logstash 环境变量文件
  copy:
    src: logstash.sh
    dest: /etc/profile.d/logstash.sh
  when: ansible_fqdn == 'web01'

- name: Source                              # 刷新环境变量
  shell: 'source /etc/profile'
  when: ansible_fqdn == 'web01'

- name: Start logstash                      # 后台不终断启动logstash
  shell: 'nohup /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_tomcat.conf &> /dev/null &'
  when: ansible_fqdn == 'web01'
  
  
  
  
  
# 8.一键开启ELK
[root@m01 ansible_elasticsearch]# ansible-playbook site.yml -i hosts 

dipqc6.png

dipb1x.png

dipLjK.png

web01安装filebeat并配置

# 1.上传安装包
[root@web01 ~]# rz -E filebeat-6.6.0-x86_64.rpm

# 2.安装filebeat
[root@web01 ~]# yum localinstall -y filebeat-6.6.0-x86_64.rpm 

# 3.备份配置文件
[root@web01 ~]# cp /etc/filebeat/filebeat.yml /etc/filebeat/filebeat.yml.bak

# 4.配置filebeat收集主机上的nginx日志并推给kafka做消息队列
[root@web01 ~]# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enable: true
  json.keys_under_root: true #可以让字段位于根节点
  json.overwrite_keys: true #对于同名的key,覆盖原有key值
  fields_under_root: true
  paths:
    - /var/log/nginx/access.log

output.kafka:
  enabled: true
  hosts: ["10.0.0.51:9092","10.0.0.52:9092","10.0.0.53:9092"]
  topic: 'dev-nginx-access'
  
  
# 5.修改nginx配置文件(将日志格式改为jason)
[root@web01 ~]# vim /etc/nginx/nginx.conf
......
http {
    log_format log_json '{ "time_local": "$time_local", '
                          '"remote_addr": "$remote_addr", '
                          '"referer": "$http_referer", '
                          '"request": "$request", '
                          '"status": $status, '
                          '"bytes": $body_bytes_sent, '
                          '"agent": "$http_user_agent", '
                          '"x_forwarded": "$http_x_forwarded_for", '
                          '"up_addr": "$upstream_addr",'
                          '"up_host": "$upstream_http_host",'
                          '"upstream_time": "$upstream_response_time",'
                          '"request_time": "$request_time" }';
    access_log  /var/log/nginx/access.log  log_json;
......
[root@web01 ~]# nginx -t
[root@web01 ~]# nginx -s reload

db01到03安装zookeeper和kafka

# 1.首先配置java环境
[root@db01 ~]# yum install -y java-1.8.0
[root@db01 ~]# java -version
openjdk version "1.8.0_262"
OpenJDK Runtime Environment (build 1.8.0_262-b10)
OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode)

# 2.上传kafka二进制安装包
[root@db01 ~]# rz -E kafka_2.11-1.1.0.tgz 
[root@db01 ~]# scp kafka_2.11-1.1.0.tgz 10.0.0.52:/root
[root@db01 ~]# scp kafka_2.11-1.1.0.tgz 10.0.0.53:/root

# 3.创建对应目录并解压
[root@db01 ~]# mkdir /opt/elk
[root@db02 ~]# mkdir /opt/elk
[root@db03 ~]# mkdir /opt/elk
[root@db01 ~]# mv kafka_2.11-1.1.0.tgz /opt/elk
[root@db02 ~]# mv kafka_2.11-1.1.0.tgz /opt/elk
[root@db03 ~]# mv kafka_2.11-1.1.0.tgz /opt/elk
[root@db01 ~]# mkdir /opt/elk/kafka/zookeeper/{data,logs} -p
[root@db02 ~]# mkdir /opt/elk/kafka/zookeeper/{data,logs} -p
[root@db03 ~]# mkdir /opt/elk/kafka/zookeeper/{data,logs} -p

# 4.创建myid文件
##  这里一定注意。3台机器的myid一定不能一样要后面要和kafka的主配置文件中的broker.id对应才可以
[root@db01 ~]# echo 1 > /opt/elk/kafka/zookeeper/data/myid 
[root@db02 ~]# echo 2 > /opt/elk/kafka/zookeeper/data/myid
[root@db03 ~]# echo 3 > /opt/elk/kafka/zookeeper/data/myid 

# 5.配置zookeeper配置文件
[root@db01 ~]# cd /opt/elk/kafka_2.11-1.1.0/config/
[root@db02 ~]# cd /opt/elk/kafka_2.11-1.1.0/config/
[root@db03 ~]# cd /opt/elk/kafka_2.11-1.1.0/config/
[root@db01 config]# vim zookeeper.properties
[root@db01 config]# cat zookeeper.properties 
#客户端连接端口


maxClientCnxns=0

#zk存放数据的目录,zk 需要有一个叫做myid的文件也是放到(必须)这个目录下

dataDir=/opt/elk/kafka/zookeeper/data                 

dataLogDir=/opt/elk/kafka/zookeeper/logs

clientPort=2181

#最大客户端连接数

maxClientCnxns=20

#是作为Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔

tickTime=2000

#此配置表示,允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。

initLimit=10

#此配置项表示Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。

syncLimit=5

#server.myid=ip:followers_connect to the leader:leader_election # server 是固定的,myid 是需要手动分配,第一个端口是follower是链接到leader的端口,第二个是用来选举leader 用的port,这里三个ip的顺序一定不能错。不然就会报错!

server.1=10.0.0.51:2888:3888

server.2=10.0.0.52:2888:3888

server.3=10.0.0.53:2888:3888


## zookeeper的三台机器配置文件一样




# 6.配置kafka
[root@db01 config]# cat server.properties 
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.0.0.51:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/kafka/logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.0.0.51:2181,10.0.0.52:2181,10.0.0.53:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0


##  这里切记不能光复制不被注释掉的内容。必须把kafka的原有配置文件一起复制过来。只复制配置内容就会报错,内置格式不正确。(大坑!切记)
## 三台db需要 需要修改的地方就在broker.id为1,2,3和zookeeper的myid对应,listeners=PLAINTEXT://10.0.0.51:9092(本机ip加端口)


# 7.创建kafka日志文件
[root@db01 config]# mkdir -p /usr/local/kafka/logs/
[root@db02 config]# mkdir -p /usr/local/kafka/logs/
[root@db03 config]# mkdir -p /usr/local/kafka/logs/


# 8.启动kafka集群(必须先启动zookeeper再启动kafka)
## 1)先启动zookeeper
[root@db01 ~]# nohup /opt/elk/kafka_2.11-1.1.0/bin/zookeeper-server-start.sh zookeeper.properties >>/dev/null 2>&1 &
[root@db02 ~]# nohup /opt/elk/kafka_2.11-1.1.0/bin/zookeeper-server-start.sh zookeeper.properties >>/dev/null 2>&1 &
[root@db03 ~]# nohup /opt/elk/kafka_2.11-1.1.0/bin/zookeeper-server-start.sh zookeeper.properties >>/dev/null 2>&1 &

## 2)验证是否启动并有集群状态(三台db均执行)
[root@db01 ~]# yum -y install nc              

### 使用echo ruok|nc 127.0.0.1 2181 测试是否启动了该Server,若回复imok表示已经启动。

[root@db01 ~]# echo ruok|nc 127.0.0.1 2181

imok[rootdb01 ~]#

## 3)查看zookeeper集群状态
[root@db01 config]# echo conf | nc 127.0.0.1 2181
clientPort=2181
dataDir=/opt/elk/kafka/zookeeper/data/version-2
dataLogDir=/opt/elk/kafka/zookeeper/logs/version-2
tickTime=2000
maxClientCnxns=20
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=1
initLimit=10
syncLimit=5
electionAlg=3
electionPort=3888
quorumPort=2888
peerType=0
[root@db01 config]# echo stat|nc 127.0.0.1 2181
Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
Clients:
 /10.0.0.52:58602[1](queued=0,recved=9067,sent=9067)
 /127.0.0.1:60150[0](queued=0,recved=1,sent=0)
 /10.0.0.51:39884[1](queued=0,recved=9154,sent=9154)
 /10.0.0.53:57138[1](queued=0,recved=9207,sent=9213)

Latency min/avg/max: 0/0/95
Received: 27496
Sent: 27501
Connections: 4
Outstanding: 0
Zxid: 0x1000000e7
Mode: follower
Node count: 140


# 9.启动kafka
[root@db01 ~]#nohup /opt/elk/kafka_2.11-1.1.0/bin/kafka-server-start.sh /opt/elk/kafka_2.11-1.1.0/config/server.properties >>/dev/null 2>&1 &
[root@db02 ~]#nohup /opt/elk/kafka_2.11-1.1.0/bin/kafka-server-start.sh /opt/elk/kafka_2.11-1.1.0/config/server.properties >>/dev/null 2>&1 &
[root@db03 ~]#nohup /opt/elk/kafka_2.11-1.1.0/bin/kafka-server-start.sh /opt/elk/kafka_2.11-1.1.0/config/server.properties >>/dev/null 2>&1 &

# 10.检测kafka是否正常启动
## 1) ps -ef |grep kafka
这里会有显示两个进程pid不是一个。一个是zookeeper的一个是kafka的
## 2)测试kafka 工作是否正常,新建一个topic
[root@db01 bin]# /opt/elk/kafka_2.11-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

## 3)查看三台db有没有test 
[root@db01 config]# /opt/elk/kafka_2.11-1.1.0/bin/kafka-topics.sh --list --zookeeper 10.0.0.53:2181
[root@db02 config]# /opt/elk/kafka_2.11-1.1.0/bin/kafka-topics.sh --list --zookeeper 10.0.0.53:2181
[root@db03 config]# /opt/elk/kafka_2.11-1.1.0/bin/kafka-topics.sh --list --zookeeper 10.0.0.53:2181


# 11.web01启动filebeat收集nginx日志并推到kafka做消息队列
[root@web01 ~]# vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enable: true
  json.keys_under_root: true #可以让字段位于根节点
  json.overwrite_keys: true #对于同名的key,覆盖原有key值
  fields_under_root: true
  paths:
    - /var/log/nginx/access.log

output.kafka:
  enabled: true
  hosts: ["10.0.0.51:9092","10.0.0.52:9092","10.0.0.53:9092"]
  topic: 'dev-nginx-access'
[root@web01 ~]# systemctl start filebeat

db05安装logstash收集kafka消息队列中的日志

# 1.web01远程复制logstash安装包到db05
[root@web01 ~]# scp logstash-6.6.0.rpm 10.0.0.55:/root

# 2.安装logstash
[root@web01 ~]# rpm -ivh logstash-6.6.0.rpm 

# 3.配置收集kafka日志
[root@db05 ~]# vim /etc/logstash/conf.d/nginx_kafka.conf 
input {
  kafka {
    bootstrap_servers => "10.0.0.51:9092"
    topics => "dev-nginx-access"
    codec => "json"
    consumer_threads => 5
    decorate_events => true
    }
}
output {
  elasticsearch {
    hosts => ["10.0.0.54:9200"]
    index => "dev-nginx-access-%{+YYYY-MM-dd}"
  }
}


# 4.启动logstash
[root@web01 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/nginx_kafka.conf &

# 5.查看进程状态
[root@web01 ~]# ps -ef |grep logstash


# 6.访问elasticsearch页面和kibana页面验证~
##友情提示,启动logstash之前可以先把web01的logstash关掉,Es页面的索引删掉

duTMHU.png

duTKBT.png

duo511.png

duoeeO.png

 

 


极客公园 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:kafka+ELKF集群搭建
喜欢 (0)
[17551054905]
分享 (0)

您必须 登录 才能发表评论!