Kafka Broker 级存储目录迁移

此场景用于在不停机或最小化影响的情况下,为单个 Broker 增加或更换存储目录。

核心原理

Kafka 支持在 log.dirs 配置项中指定多个数据目录。从 Kafka 2.4.0 版本开始,通过 KIP-113 引入了动态日志目录迁移功能,允许在线将分区的日志从一个目录迁移到另一个,而无需重启 Broker。

实施步骤

准备新目录

在 Broker 服务器上创建新的数据目录,例如 /data/kafka-logs-2。
确保新目录的权限和所有者与 Kafka 进程匹配(例如 chown -R kafka:kafka /data/kafka-logs-2)。

更新 Broker 配置

编辑 Broker 的 server.properties 文件,在 log.dirs 配置项中添加新目录的路径,与旧目录用逗号分隔。

log.dirs=/data/kafka-logs-1,/data/kafka-logs-2

触发数据迁移

重启后,新创建的分区会随机分配到任一在线目录。要主动将旧目录的数据迁移到新目录,需要使用 Kafka 的管理工具。
使用 kafka-log-dirs.sh 工具可以查询数据目录分布。
Kafka 会在后台启动一个复制线程,将数据从旧目录复制到新目录,复制完成后自动切换并删除旧数据,此过程对业务读写基本无感。

清理旧数据

确认所有需要迁移的分区都已成功迁移到新目录,并且集群运行稳定后,可以从 log.dirs 配置中移除旧目录路径,并重启 Broker,最后删除旧目录下的数据文件。

注意事项:
版本要求:动态在线迁移功能需要 Kafka 2.4.0 及以上版本。
磁盘空间:在迁移过程中,新旧两份数据会同时存在,请确保目标磁盘有足够的空间。

执行过程

1、查看指定topic日志目录分布

./kafka-log-dirs.sh --bootstrap-server localhost:9092 \
  --describe \
  --broker-list 0 \
  --topic-list topic-sensor-80\
  --command-config client.properties

输出:
Querying brokers for log directories information
Received log directory information from brokers 0
{"brokers":[{"broker":0,"logDirs":[{"partitions":[],"error":null,"logDir":"/mnt/kafka-logs"},{"partitions":[{"partition":"topic-sensor-80-2","size":0,"offsetLag":0,"isFuture":false},{"partition":"topic-sensor-80-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"topic-sensor-80-0","size":0,"offsetLag":0,"isFuture":false}],"error":null,"logDir":"/opt/kafka_2.13-3.7.0/kafka-logs"}]}],"version":1}

2、执行日志目录迁移

./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassignment.json \
  --execute \
  --throttle 50000000 \
  --command-config client.properties
  • --throttle 50000000: 将复制流量限制为每秒 50MB,避免挤占正常业务的带宽和 IO。

迁移Topic配置reassignment.json:

{
  "version": 1,
  "partitions": [
    {
      "topic": "topic-sensor-80",
      "partition": 0,
      "replicas": [0],
      "log_dirs": ["/data/kafka-logs"]
    },
        {
      "topic": "topic-sensor-80",
      "partition": 1,
      "replicas": [0],
      "log_dirs": ["/data/kafka-logs"]
    },
        {
      "topic": "topic-sensor-80",
      "partition": 2,
      "replicas": [0],
      "log_dirs": ["/data/kafka-logs"]
    }
  ]
}

参数说明:

  • topic: 要迁移的 Topic 名称。
  • partition: 要迁移的分区 ID。
  • replicas: 目标 Broker 的 ID 列表。这里 [1] 表示目标 Broker ID 为 0。
  • log_dirs: 目标目录列表,与 replicas 一一对应。这里 ["/data/kafka-logs"] 表示将分区移动到 Broker 0 上的 /data/kafka-logs 目录。

验证迁移进度

./kafka-reassign-partitions.sh --bootstrap-server 172.31.227.126:9092 \
  --reassignment-json-file reassignment.json \
  --verify \
  --command-config client.properties

验证过程日志:

root@iZuf68kdp:/opt/kafka_2.13-3.7.0/bin# ./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
>   --reassignment-json-file reassignment.json \
>   --verify \
>   --command-config client.properties
Status of partition reassignment:
Reassignment of partition topic-sensor-900-0 is completed.
Reassignment of partition topic-sensor-900-1 is completed.
Reassignment of partition topic-sensor-900-2 is completed.
Reassignment of replica topic-sensor-900-0-0 completed successfully.
Reassignment of replica topic-sensor-900-1-0 completed successfully.
Reassignment of replica topic-sensor-900-2-0 completed successfully.
Clearing broker-level throttles on broker 0
Clearing topic-level throttles on topic topic-sensor-900

移除流量限制

高版本的kafka执行--verify命令时,如果检测到任务已经完成时,会自动移除了之前设置的 Broker 级别和 Topic 级别的流量限制。

低版本的kafka需手动移除流量限制

./kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassignment.json \
  --execute

SASL认证配置

client.properties:

# 指定安全协议,必须与服务端 advertised.listeners 的前缀一致
security.protocol=SASL_PLAINTEXT

# 指定 SASL 机制 (通常是 PLAIN, 也有可能是 SCRAM-SHA-256/512,视服务端配置而定)
sasl.mechanism=SCRAM-SHA-512

# 配置 JAAS 登录模块 (替换 YOUR_USERNAME 和 YOUR_PASSWORD)
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="admin12356";
最后修改于:2026年04月04日 13:07

添加新评论