Linux部署Zookeeper+Kafka

小柒博客 1 128,2855字数 3079阅读10分15秒阅读模式

一、kafka简介

Kafka 被称为下一代分布式-订阅消息系统,是非营利性组织ASF(Apache Software Foundation,简称为ASF)基金会中的一个开源项目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等开源软件都属于Apache基金会的开源软件,类似的消息系统还有RbbitMQ、ActiveMQ、ZeroMQ,最主要的优势是其具备分布式功能、并且结合zookeeper可以实现动态扩容。

Apache Kafka 与传统消息系统相比,有以下不同:

1)它被设计为一个分布式系统,易于向外扩展;

2)它同时为发布和订阅提供高吞吐量;

3)它支持多订阅者,当失败时能自动平衡消费者;

4)它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

安装环境:

三台服务器IP分别是:

IP:192.168.56.11

IP:192.168.56.12

IP:192.168.56.13

三台服务器分别配置hosts文件:

[root@localhost ~]# cat /etc/hosts

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.56.11 server1
192.168.56.12 server2
192.168.56.13 server3

1.1:下载安装并验证zookeeper:

1.1.1:kafka下载地址:

http://kafka.apache.org/downloads.html

1.1.2:zookeeper 下载地址:

http://zookeeper.apache.org/releases.html

1.1.3:安装zookeeper:

zookeeper集群特性:整个集群中只要有超过集群数量一半的zookeeper工作只正常的,那么整个集群对外就是可用的,假如有2台服务器做了一个zookeeper集群,只要有任何一台故障或宕机,那么这个zookeeper集群就不可用了,因为剩下的一台没有超过集群一半的数量,但是假如有三台zookeeper组成一个集群,那么损坏一台就还剩两台,大于3台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩一台集群就不可用了。那么要是4台组成一个zookeeper集群,损坏一台集群肯定是正常的,那么损坏两台就还剩两台,那么2台不大于集群数量的一半,所以3台的zookeeper集群和4台的zookeeper集群损坏两台的结果都是集群不可用,一次类推5台和6台以及7台和8台都是同理,所以这也就是为什么集群一般都是奇数的原因。

下载后的安装文件上传到各服务器的/usr/local/src目录然后分别执行以下操作。

1.1.3.1:Server1配置:

1)安装JDK-1.8(3台机器都需要安装jdk)

[root@server1 ~]# cd /usr/local/src/

[root@server1 src]# wget -c https://mirrors.yangxingzhen.com/jdk/jdk-8u144-linux-x64.tar.gz

[root@server1 src]# tar zxf jdk-8u144-linux-x64.gz -C /usr/local

2)配置环境变量,添加以下内容

[root@server1 src]# vim /etc/profile

export JAVA_HOME=/usr/local/jdk1.8.0_144
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOMR/bin

3)执行source /etc/profile使其生效

[root@server1 src]# source /etc/profile

[root@server1 src]# java -version

4)安装Zookeeper

1、下载zookeeper包

[root@server1 ~]# wget -c https://mirrors.yangxingzhen.com/zookeeper/zookeeper-3.4.10.tar.gz

2、解压安装配置Zookeeper

[root@server1 ~]# tar zxf zookeeper-3.4.10.tar.gz

[root@server1 ~]# mv zookeeper-3.4.10 /usr/local/zookeeper

[root@server1 ~]# cd /usr/local/zookeeper/

3、创建快照日志存放目录:

[root@server1 zookeeper]# mkdir -p data

4、创建事务日志存放目录:

[root@server1 zookeeper]# mkdir -p logs

【注意】:如果不配置dataLogDir,那么事务日志也会写在data目录中。这样会严重影响zookeeper的性能。因为在zookeeper吞吐量很高的时候,产生的事务日志和快照日志太多。

[root@server1 zookeeper]# cd conf/

[root@server1 conf]# cp zoo_sample.cfg zoo.cfg

[root@server1 conf]# vim zoo.cfg

#配置内容

#服务器之间或客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
tickTime=2000
#集群中leader服务器与follower服务器第一次连接最多次数
initLimit=10
#集群中leader服务器与follower服务器第一次连接最多次数
syncLimit=5
#客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
clientPort=2181
#存放数据文件
dataDir=/usr/local/zookeeper/data
#存放日志文件
dataLogDir=/usr/local/zookeeper/logs
#Zookeeper cluster,2888为选举端口,3888为心跳端口
#服务器编号=服务器IP:LF数据同步端口:LF选举端口
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888
[root@server1 conf]# echo "1" > /usr/local/zookeeper/data/myid

1.1.3.2:Server2 配置:

1)安装JDK-1.8(3台机器都需要安装JDK)

[root@server2 ~]# cd /usr/local/src/

[root@server2 src]# wget -c https://mirrors.yangxingzhen.com/jdk/jdk-8u144-linux-x64.tar.gz

[root@server2 src]# tar zxf jdk-8u144-linux-x64.gz -C /usr/local

2)配置环境变量,添加以下内容

[root@server2 src]# vim /etc/profile

export JAVA_HOME=/usr/local/jdk1.8.0_144
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOMR/bin

3)执行source /etc/profile使其生效

[root@server2 src]# source /etc/profile

[root@server2 src]# java -version

4)安装Zookeeper

1、下载Zookeeper软件包

[root@server2 ~]# wget -c https://mirrors.yangxingzhen.com/zookeeper/zookeeper-3.4.10.tar.gz

2、解压安装配置Zookeeper

[root@server2 ~]# tar zxf zookeeper-3.4.10.tar.gz

[root@server2 ~]# mv zookeeper-3.4.10 /usr/local/zookeeper

[root@server2 ~]# cd /usr/local/zookeeper/

3、创建快照日志存放目录:

[root@server2 zookeeper]# mkdir -p data

4、创建事务日志存放目录:

[root@server2 zookeeper]# mkdir -p logs

【注意】:如果不配置dataLogDir,那么事务日志也会写在data目录中。这样会严重影响zookeeper的性能。因为在zookeeper吞吐量很高的时候,产生的事务日志和快照日志太多。

[root@server2 zookeeper]# cd conf/

[root@server2 conf]# cp zoo_sample.cfg zoo.cfg

[root@server2 conf]# vim zoo.cfg

#配置内容

#服务器之间或客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
tickTime=2000
#集群中leader服务器与follower服务器第一次连接最多次数
initLimit=10
#集群中leader服务器与follower服务器第一次连接最多次数
syncLimit=5
#客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
clientPort=2181
#存放数据文件
dataDir=/usr/local/zookeeper/data
#存放日志文件
dataLogDir=/usr/local/zookeeper/logs
#Zookeeper cluster,2888为选举端口,3888为心跳端口
#服务器编号=服务器IP:LF数据同步端口:LF选举端口
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

[root@server2 conf]# echo "2" > /usr/local/zookeeper/data/myid

1.1.3.3:Server3 配置:

1)安装JDK-1.8(3台机器都需要安装JDK)

[root@server3 ~]# cd /usr/local/src/

[root@server3 src]# wget -c https://mirrors.yangxingzhen.com/jdk/jdk-8u144-linux-x64.tar.gz

[root@server3 src]# tar zxf jdk-8u144-linux-x64.gz -C /usr/local

2)配置环境变量,添加以下内容

[root@server3 src]# vim /etc/profile

export JAVA_HOME=/usr/local/jdk1.8.0_144
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH:$HOMR/bin
3)执行source /etc/profile使其生效

[root@server3 src]# source /etc/profile

[root@server3 src]# java -version

4)安装Zookeeper

1、下载Zookeeper软件包

[root@server3 ~]# wget -c https://mirrors.yangxingzhen.com/zookeeper/zookeeper-3.4.10.tar.gz

2、解压安装配置Zookeeper

[root@server3 ~]# tar zxf zookeeper-3.4.10.tar.gz

[root@server3 ~]# mv zookeeper-3.4.10 /usr/local/zookeeper

[root@server3 ~]# cd /usr/local/zookeeper/

3、创建快照日志存放目录:

[root@server3 zookeeper]# mkdir -p data

4、创建事务日志存放目录:

[root@server3 zookeeper]# mkdir -p logs

【注意】:如果不配置dataLogDir,那么事务日志也会写在data目录中。这样会严重影响zookeeper的性能。因为在zookeeper吞吐量很高的时候,产生的事务日志和快照日志太多。

[root@server3 zookeeper]# cd conf/

[root@server3 conf]# cp zoo_sample.cfg zoo.cfg

[root@server3 conf]# vim zoo.cfg

#配置内容

#服务器之间或客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
tickTime=2000
#集群中leader服务器与follower服务器第一次连接最多次数
initLimit=10
#集群中leader服务器与follower服务器第一次连接最多次数
syncLimit=5
#客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
clientPort=2181
#存放数据文件
dataDir=/usr/local/zookeeper/data
#存放日志文件
dataLogDir=/usr/local/zookeeper/logs
#Zookeeper cluster,2888为选举端口,3888为心跳端口
#服务器编号=服务器IP:LF数据同步端口:LF选举端口
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888
[root@server3 conf]# echo "3" > /usr/local/zookeeper/data/myid

1.1.3.4:各服务器启动Zookeeper:

[root@server1 ~]# /usr/local/zookeeper/bin/zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[root@server2 src]# /usr/local/zookeeper/bin/zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

[root@server3 src]# /usr/local/zookeeper/bin/zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

1.1.3.5:查看各Zookeeper状态:

[root@server1 ~]# /usr/local/zookeeper/bin/zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Mode: follower

[root@server2 ~]# /usr/local/zookeeper/bin/zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Mode: leader

[root@server3 ~]# /usr/local/zookeeper/bin/zkServer.sh status

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg

Mode: follower

1.1.3.6:Zookeeper简单操作命令:

#连接到任意节点生成数据:

[root@server3 src]# /usr/local/zookeeper/bin/zkCli.sh -server 192.168.56.11:2181

[zk: 192.168.56.11:2181(CONNECTED) 3] create /test "hello"

#在其他Zookeeper节点验证数据:

[root@server2 src]# /usr/local/zookeeper/bin/zkCli.sh -server 192.168.56.12:2181

[zk: 192.168.56.12:2181(CONNECTED) 0] get /test

hello

cZxid = 0x100000004

ctime = Fri Dec 15 11:14:07 CST 2017

mZxid = 0x100000004

mtime = Fri Dec 15 11:14:07 CST 2017

pZxid = 0x100000004

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 5

numChildren = 0

1.2:安装并测试kafka:

1.2.1:Server1安装kafka:

[root@server1 src]# wget -c https://archive.apache.org/dist/kafka/2.0.1/kafka_2.12-2.0.1.tgz

[root@server1 src]# tar xf kafka_2.12-2.0.1.tgz

[root@server1 src]# mv kafka_2.12-2.0.1 /usr/local/kafka

[root@server1 src]# vim /usr/local/kafka/config/server.properties

broker.id=1
listeners=PLAINTEXT://192.168.56.11:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=server1:2181,server2:2181,server3:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
1.2.2:Server2安装kafka:

[root@server2 src]# wget -c https://archive.apache.org/dist/kafka/2.0.1/kafka_2.12-2.0.1.tgz

[root@server2 src]# tar xf kafka_2.12-2.0.1.tgz

[root@server2 src]# mv kafka_2.12-2.0.1 /usr/local/kafka

[root@server2 src]# vim /usr/local/kafka/config/server.properties

broker.id=2
listeners=PLAINTEXT://192.168.56.12:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=server1:2181,server2:2181,server3:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

1.2.3:Server3安装kafka:

[root@server3 src]# wget -c https://archive.apache.org/dist/kafka/2.0.1/kafka_2.12-2.0.1.tgz

[root@server3 src]# tar xf kafka_2.12-2.0.1.tgz

[root@server3 src]# mv kafka_2.12-2.0.1 /usr/local/kafka

[root@server3 src]# vim /usr/local/kafka/config/server.properties

broker.id=3
listeners=PLAINTEXT://192.168.56.13:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=10
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=server1:2181,server2:2181,server3:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0

1.2.4:分别启动kafka:

1.2.4.1:Serevr1启动kafka:

[root@linux-host1 src]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #以守护进程的方式启动

1.2.4.2:Serevr2启动kafka:

[root@linux-host2 src]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

1.2.4.3:Serevr3启动kafka:

[root@linux-host3 src]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

#/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #此方式zookeeper会在shell断开后关闭

1.2.5:测试kafka:

1.2.5.1:验证进程:

[root@linux-host1 ~]# jps

10578 QuorumPeerMain

11572 Jps

11369 Kafka

[root@linux-host2 ~]# jps

2752 QuorumPeerMain

8229 Kafka

8383 Jps

[root@linux-host3 ~]# jps

12626 Kafka

2661 QuorumPeerMain

12750 Jps

1.2.5.2:测试创建topic:

创建名为logstashtest,partitions(分区)为3,replication(复制)为3的topic(主题):

在任意kafaka服务器操作:

[root@linux-host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --partitions 3 --replication-factor 3 --topic logstashtest

Created topic "logstashtest".

1.2.5.3:测试获取topic:

可以在任意一台kafka服务器进行测试:

[root@linux-host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --topic logstashtest

状态说明:logstashtest有三个分区分别为1、2、3,分区0的leader是3(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

1.2.5.4:删除topic:

[root@linux-host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --topic logstashtest

Topic logstashtest is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

1.2.5.5:获取所有topic:

[root@linux-host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181

__consumer_offsets

nginx-accesslog-5612

system-log-5612

1.2.6:kafka命令测试消息发送:

1.2.6.1:创建topic:

[root@linux-host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --partitions 3 --replication-factor 3 --topic messagetest

Created topic "messagetest".

1.2.6.2:发送消息:

[root@linux-host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.56.11:9092,192.168.56.12:9092,192.168.56.13:9092 --topic messagetest

>hello

>kafka

>logstash

>ss

>oo

1.2.6.3:其他kafka服务器测试获取数据:

#Server1:

[root@linux-host1 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --topic messagetest --from-beginning

#Server2:

#Server3:

1.2.7:使用logstash测试向kafka写入数据:

1.2.7.1:编辑logstash配置文件:

[root@linux-host3 ~]# vim /etc/logstash/conf.d/logstash-to-kafka.sh

input {
stdin {}
}
output {
kafka {
topic_id => "hello"
bootstrap_servers => "192.168.56.11:9092"
batch_size => 5
}
stdout {
codec => rubydebug
}
}
1.2.7.2:验证kafka收到logstash数据:

[root@linux-host1 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 --topic hello --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] inste

ad of [zookeeper].

2017-12-15T14:33:00.684Z linux-host3.exmaple.com hello

2017-12-15T14:33:31.127Z linux-host3.exmaple.com test

[root@linux-host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.56.11:9

092,192.168.56.12:9092,192.168.56.13:9092 --topic messagetest>hello

>kafka

>logstash

若文章图片、下载链接等信息出错,请在评论区留言反馈,博主将第一时间更新!如本文“对您有用”,欢迎随意打赏,谢谢!

继续阅读
Wechat
微信扫一扫,加我!
weinxin
微信号已复制
微信公众号
微信扫一扫,关注我!
weinxin
公众号已复制
小柒博客
  • 本文由 小柒博客 发表于 2019年6月23日 21:43:04
  • 声明:本站所有文章,如无特殊说明或标注,本站文章均为原创。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。转载请务必保留本文链接:https://www.yangxingzhen.com/4034.html
    • 运维老司机
      运维老司机 6

      亲测,写的不错 :razz:

    匿名

    发表评论

    匿名网友
    :?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

    拖动滑块以完成验证