一、Topic相关命令
1、创建Topic
# 2.8.2
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.222:2181 --partitions 10 --replication-factor 1 --topic test
# 3.2.1
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.2.222:9092 --partitions 10 --replication-factor 1 --topic test
# 基于SCRAM-SHA-256
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.2.222:9092 --partitions 10 --replication-factor 1 --topic test --command-config /usr/local/kafka/client_sasl_scram.properties
# 基于Plain
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.2.222:9092 --partitions 10 --replication-factor 1 --topic test --command-config /usr/local/kafka/client_sasl_plain.properties
2、删除Topic
# 2.8.2
/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.2.222:2181 --topic test
# 3.2.1
/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 192.168.2.222:9092 --topic test
# 基于SCRAM-SHA-256
/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 192.168.2.222:9092 --topic test --command-config /usr/local/kafka/client_sasl_scram.properties
# 基于Plain
/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 192.168.2.222:9092 --topic test --command-config /usr/local/kafka/client_sasl_plain.properties
3、查看Topic列表
# 2.8.2
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.222:2181
# 3.2.1
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.2.222:9092
# 基于SCRAM-SHA-256
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.2.222:9092 --command-config /usr/local/kafka/client_sasl_scram.properties
# 基于Plain
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.2.222:9092 --command-config /usr/local/kafka/client_sasl_plain.properties
4、查看Topic详情
# 2.8.2
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.222:2181 --topic test
# 3.2.1
/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 192.168.2.222:9092 --topic test
# 基于SCRAM-SHA-256
/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 192.168.2.222:9092 --command-config /usr/local/kafka/client_sasl_scram.properties
# 基于Plain
/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 192.168.2.222:9092 --command-config /usr/local/kafka/client_sasl_plain.properties
5、修改Topic的partition数
# 2.8.2
/usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper 192.168.2.222:2181 --topic test --partitions 15
# 3.2.1
/usr/local/kafka/bin/kafka-topics.sh --alter --bootstrap-server 192.168.2.222:9092 --topic test --partitions 15
# 基于SCRAM-SHA-256
/usr/local/kafka/bin/kafka-topics.sh --alter --bootstrap-server 192.168.2.222:9092 --topic test --partitions 15 --command-config /usr/local/kafka/client_sasl_scram.properties
# 基于Plain
/usr/local/kafka/bin/kafka-topics.sh --alter --bootstrap-server 192.168.2.222:9092 --topic test --partitions 15 --command-config /usr/local/kafka/client_sasl_plain.properties
二、Topic消息相关命令
1、生产者指定Topic发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.222:9092 --topic test
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.222:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=SCRAM-SHA-256
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.222:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
2、查看Topic消费消息(从头开始)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --from-beginning
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=SCRAM-SHA-256
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
3、查看Topic消费消息(从尾开始)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --offset latest
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --offset latest --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=SCRAM-SHA-256
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --offset latest --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
4、查看Topic指定分区消费消息(从尾开始)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --offset latest --partition 0
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --offset latest --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=SCRAM-SHA-256 --partition 0
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --offset latest --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --partition 0
5、查看Topic指定分组消费消息(从头开始)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --from-beginning --group task
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=SCRAM-SHA-256 --group task
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.222:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --group task
三、消息者Group相关命令
1、查看消费者组列表
/usr/local/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server 192.168.2.222:9092
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server 192.168.2.222:9092 --command-config=/usr/local/kafka/config/client_sasl_scram.properties
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server 192.168.2.222:9092 --command-config=/usr/local/kafka/config/client_sasl_plain.properties
2、查看指定消费者组详情
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --describe
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --describe --command-config=/usr/local/kafka/config/client_sasl_scram.properties
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --describe --command-config=/usr/local/kafka/config/client_sasl_plain.properties
3、删除指定消费者组
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --delete
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --delete --command-config=/usr/local/kafka/config/client_sasl_scram.properties
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --delete --command-config=/usr/local/kafka/config/client_sasl_plain.properties
4、删除指定消费者组中Topic
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --topic building --delete
# 基于SASL_PLAINTEXT(SCRAM-SHA-256)
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --topic building --delete --command-config=/usr/local/kafka/config/client_sasl_scram.properties
# 基于SASL_PLAINTEXT(Plain)
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.2.222:9092 --group test --topic building --delete --command-config=/usr/local/kafka/config/client_sasl_plain.properties
四、SASL_PLAINTEXT相关配置
1、基于SCRAM-SHA-256
# /usr/local/kafka/config/client_sasl_scram.properties
bootstrap.servers=192.168.2.222:9092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="www.yangxingzhen.com";
2、基于Plain
# /usr/local/kafka/config/client_sasl_plain.properties
bootstrap.servers=192.168.2.195:9092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="www.yangxingzhen.com";
若文章图片、下载链接等信息出错,请在评论区留言反馈,博主将第一时间更新!如本文“对您有用”,欢迎随意打赏,谢谢!
评论