枫叶居

桃李春风一杯酒,江湖夜雨十年灯

0%

Kafka常见操作

Kafka常见操作命令行集锦

导出变量定义: $ZK_HOSTS定义在~/.bashrc中的导出变量,为Zookeeper集群地址

查看Kafka Cluster中的topic

1
./bin/kafka-topics.sh --list --zookeeper $ZK_HOSTS

查看Kafka某个topic的信息

1
./bin/kafka-topics.sh --describe --topic benchmark --zookeeper $ZK_HOSTS

新建一个1个partition的topic readbench

1
2
3
4
5
6
./bin/kafka-topics.sh --create --topic readbench --partitions 1 --replication-factor 1 --zookeeper $ZK_HOSTS```

> 新建一个1分区,复制因子为1的topic

```bash
./bin/kafka-topics.sh --create --topic name --partitions 1 --replication-factor 1 --zookeeper $ZK_HOSTS

新建一个5分区,复制因子为1的topic

1
./bin/kafka-topics.sh --create --topic name --partitions 5 --replication-factor 1 --zookeeper $ZK_HOSTS

以group_name为消费组消费名称为name的topic中的数据

1
./bin/kafka-console-consumer.sh --topic name --partition 0 --consumer-property group.id=group_name --from-beginning --bootstrap-server 10.95.134.86:9092,10.95.134.86:9093,10.95.134.86:9094

往topic name中生产数据

1
bin/kafka-console-producer.sh --broker-list 10.95.134.86:9092,10.95.134.86:9093,10.95.134.86:9094 --sync --topic name

物理删除kafka中的一个topic(kafka默认是逻辑删除)

1
./bin/kafka-topics.sh --delete --topic name --zookeeper $ZK_HOSTS
  • 删除掉Zookeeper集群中name标识的topic
1
./bin/zkCli.sh rmr /brokers/topics/name

Kafka中为topic增加partition(不能够删除partition,仅支持增加)

1
./bin/kafka-topics.sh --alter --topic name --zookeeper $ZK_HOSTS --partitions 3

Kafka中为topic更新配置参数

1
./bin/kafka-topics.sh --alter --topic readbench --zookeeper $ZK_HOSTS --config cleanup.policy=delete

Kafka集群中添加新的broker时,需要将一些topic的存储压力分散到新的broker上去,这时需要kafka reassign工具,分三步操作

  • 1、指定待迁移的topic,以json格式存放在topic-to-move.json文件中。
1
2
3
4
5
6
7
8
9
cat >> topics-to-move.json << EOF
{
"topics": [
{"topic": "name1"},
{"topic": "name2"},
],
"version": 1
}
EOF
  • 2、生成移动脚本
1
./bin/kafka-reassign-partitions.sh --zookeeper $ZK_HOSTS --topics-to-move-json-file topic-to-move.json --broker-list "1,2" --generate

将输出信息中的Proposed partition reassignment configuration一栏下的json字符串保存为reassign.json文件。

1
echo '{"version":1,"partitions":[{"topic":"readbench","partition":0,"replicas":[3]}]}' >> reassign.json
  • 3、执行迁移脚本
1
./bin/kafka-reassign-partitions.sh --zookeeper $ZK_HOSTS --reassignment-json-file reassign.json --execute
  • 4、查看迁移进度
1
/bin/kafka-reassign-partitions.sh --zookeeper $ZK_HOSTS --reassignment-json-file reassign.json --verify
坚持原创技术分享,您的支持将鼓励我继续创作!