Kafka Broker 级存储目录迁移
此场景用于在不停机或最小化影响的情况下,为单个 Broker 增加或更换存储目录。
核心原理
Kafka 支持在 log.dirs 配置项中指定多个数据目录。从 Kafka 2.4.0 版本开始,通过 KIP-113 引入了动态日志目录迁移功能,允许在线将分区的日志从一个目录迁移到另一个,而无需重启 Broker。
实施步骤
准备新目录
在 Broker 服务器上创建新的数据目录,例如 /data/kafka-logs-2。
确保新目录的权限和所有者与 Kafka 进程匹配(例如 chown -R kafka:kafka /data/kafka-logs-2)。
更新 Broker 配置
编辑 Broker 的 server.properties 文件,在 log.dirs 配置项中添加新目录的路径,与旧目录用逗号分隔。
log.dirs=/data/kafka-logs-1,/data/kafka-logs-2
触发数据迁移
重启后,新创建的分区会随机分配到任一在线目录。要主动将旧目录的数据迁移到新目录,需要使用 Kafka 的管理工具。
使用 kafka-log-dirs.sh 工具可以查询数据目录分布。
Kafka 会在后台启动一个复制线程,将数据从旧目录复制到新目录,复制完成后自动切换并删除旧数据,此过程对业务读写基本无感。
清理旧数据
确认所有需要迁移的分区都已成功迁移到新目录,并且集群运行稳定后,可以从 log.dirs 配置中移除旧目录路径,并重启 Broker,最后删除旧目录下的数据文件。
注意事项:
版本要求:动态在线迁移功能需要 Kafka 2.4.0 及以上版本。
磁盘空间:在迁移过程中,新旧两份数据会同时存在,请确保目标磁盘有足够的空间。
执行过程
1、查看指定topic日志目录分布
./kafka-log-dirs.sh --bootstrap-server localhost:9092 \
--describe \
--broker-list 0 \
--topic-list topic-sensor-80\
--command-config client.properties
输出:
Querying brokers for log directories information
Received log directory information from brokers 0
{"brokers":[{"broker":0,"logDirs":[{"partitions":[],"error":null,"logDir":"/mnt/kafka-logs"},{"partitions":[{"partition":"topic-sensor-80-2","size":0,"offsetLag":0,"isFuture":false},{"partition":"topic-sensor-80-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"topic-sensor-80-0","size":0,"offsetLag":0,"isFuture":false}],"error":null,"logDir":"/opt/kafka_2.13-3.7.0/kafka-logs"}]}],"version":1}
2、执行日志目录迁移
./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassignment.json \
--execute \
--throttle 50000000 \
--command-config client.properties
- --throttle 50000000: 将复制流量限制为每秒 50MB,避免挤占正常业务的带宽和 IO。
迁移Topic配置reassignment.json:
{
"version": 1,
"partitions": [
{
"topic": "topic-sensor-80",
"partition": 0,
"replicas": [0],
"log_dirs": ["/data/kafka-logs"]
},
{
"topic": "topic-sensor-80",
"partition": 1,
"replicas": [0],
"log_dirs": ["/data/kafka-logs"]
},
{
"topic": "topic-sensor-80",
"partition": 2,
"replicas": [0],
"log_dirs": ["/data/kafka-logs"]
}
]
}
参数说明:
- topic: 要迁移的 Topic 名称。
- partition: 要迁移的分区 ID。
- replicas: 目标 Broker 的 ID 列表。这里 [1] 表示目标 Broker ID 为 0。
- log_dirs: 目标目录列表,与 replicas 一一对应。这里 ["/data/kafka-logs"] 表示将分区移动到 Broker 0 上的 /data/kafka-logs 目录。
验证迁移进度
./kafka-reassign-partitions.sh --bootstrap-server 172.31.227.126:9092 \
--reassignment-json-file reassignment.json \
--verify \
--command-config client.properties
验证过程日志:
root@iZuf68kdp:/opt/kafka_2.13-3.7.0/bin# ./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
> --reassignment-json-file reassignment.json \
> --verify \
> --command-config client.properties
Status of partition reassignment:
Reassignment of partition topic-sensor-900-0 is completed.
Reassignment of partition topic-sensor-900-1 is completed.
Reassignment of partition topic-sensor-900-2 is completed.
Reassignment of replica topic-sensor-900-0-0 completed successfully.
Reassignment of replica topic-sensor-900-1-0 completed successfully.
Reassignment of replica topic-sensor-900-2-0 completed successfully.
Clearing broker-level throttles on broker 0
Clearing topic-level throttles on topic topic-sensor-900
移除流量限制
高版本的kafka执行--verify命令时,如果检测到任务已经完成时,会自动移除了之前设置的 Broker 级别和 Topic 级别的流量限制。
低版本的kafka需手动移除流量限制
./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassignment.json \
--execute
SASL认证配置
client.properties:
# 指定安全协议,必须与服务端 advertised.listeners 的前缀一致
security.protocol=SASL_PLAINTEXT
# 指定 SASL 机制 (通常是 PLAIN, 也有可能是 SCRAM-SHA-256/512,视服务端配置而定)
sasl.mechanism=SCRAM-SHA-512
# 配置 JAAS 登录模块 (替换 YOUR_USERNAME 和 YOUR_PASSWORD)
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin12356";