1 Kafka的安装与配置

  1. kafka_2.11-0.11.0.2.tgz上传至102机器/opt/software目录下,并解压安装至/opt/module,重命名为kafka
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
mv kafka_2.11-0.11.0.0/ kafka
  1. /opt/module/kafka创建logs文件夹
mkdir logs
  1. 修改配置文件/opt/module/kafka/config/server.properties

    1. ==集群3个节点的broker.id必须不一样==,hadoop102的为0,103的为1,104的为2
    2. 修改log.dirs=/opt/module/kafka/logs
    3. 放开delete.topic.enable=true,使topic可以被删除
#broker 的全局唯一编号,不能重复
broker.id=0
#删除topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前broker 上的分区个数
num.partitions=1
#用来恢复和清理data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
  1. 分发至其他机器,==/opt/module/kafka/config/server.properties的broker.id必须得改==
xsync /opt/module/kafka
  1. 配置环境变量,新增kafka环境变量(另外2个节点的也要手动修改)
vim /etc/profile.d/env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#使配置文件生效
source /etc/profile.d/env.sh
  1. 单节点启动脚本(群起的话,在3个节点挨个执行)(/opt/module/kafka)
bin/kafka-server-start.sh config/server.properties &
  1. 单节点停止脚本(群停的话,在3个节点挨个之心)(同上)
bin/kafka-server-stop.sh stop

2 Kafka命令行操作

  • 均在/opt/module/kafka目录下执行

2.1 查看当前服务器中的所有topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2.2 创建topic

#创建topic_start
bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions 1 --topic topic_start
#创建topic_event
bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions 1 --topic topic_event
  • 选项说明:

    • --topic 定义topic 名
    • --replication-factor 定义副本数
    • --partitions 定义分区数

2.3 删除topic

bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_stop
  • 需要server.properties 中设置delete.topic.enable=true ,否则只是标记删除或者直接重启。

2.4 发送消息

bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_start

2.5 消费消息

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic topic_start
  • --from-beginning:会把该topic中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

2.6 查看某个topic的详情

bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic topic_start

3 Kafka群起脚本

  1. ~/bin目录下创建kf.sh
#!/bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
        do
            echo --------$i 启动kafka---------
            ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
        done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
        do
            echo --------$i 停止kafka---------
            ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
        done
};;
esac
  1. 增加脚本执行权限
chmod 777 kf.sh
  1. kafka集群启动
kf.sh start
  1. kafka集群停止
kf.sh stop

4 项目经验——Kafka压力测试

4.1 Kafka压测

  • 用Kafka 官方自带的脚本,对Kafka 进行压测。Kafka 压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO 达到瓶颈。生产和消费的脚本分别为:

    • kafka-consumer-perf-test.sh
    • kafka-producer-perf-test.sh

4.2 生产者压测 producer

  • /opt/module/kafka
bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
  • 说明

    • record-size是一条信息有多大,单位是字节。
    • num-records 是总共发送多少条信息。
    • throughput 是每秒多少条信息,==设成-1,表示不限流==,可测出生产者最大吞吐量。
  • 结果

4.3 消费者压测 consumer

  • /opt/module/kafka
bin/kafka-consumer-perf-test.sh --zookeeper hadoop102:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1
  • 说明

    • --zookeeper 指定zookeeper 的链接信息
    • --topic 指定topic 的名称
    • --fetch-size 指定每次fetch 的数据的大小
    • --messages 总共要消费的消息个数
  • 结果

5 项目经验——Kafka机器数量计算

==Kafka 机器数量(经验公式)=2 (峰值生产速度 副本数/100)+1==
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka 的数量。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka 机器数量=2 (502/100)+ 1=3 台

Last modification:May 13th, 2020 at 02:13 pm