Springboot项目集成Kafka

Springboot整合Kafka具体步骤如下:

1、创建Springboot项目

创建Springboot项目,添加以下依赖:
Spring Web
Spring for Apache Kafka

如下图示:

springboot-kafka.pngspringboot-kafka.png

对应pom.xml配置如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2、新增生产者消费者配置;

如果配置文件为properties,则配置内容如下:

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=localhost:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
​
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
​
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=kafka-group01
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

如果配置文件为yaml,则配置内容如下:

spring:
  #重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
  kafka:
    bootstrap-servers: localhost:9092
    #https://kafka.apache.org/documentation/#producerconfigs
    producer:
      bootstrap-servers: localhost:9092
      retries: 1 #生产者发送消息失败重试次数
      batch-size: 16384 # 同一批次内存大小(默认16K)
      buffer-memory: 314572800 #生产者内存缓存区大小(300M = 300*1024*1024)
      #acks=0:无论成功还是失败,只发送一次。无需确认
      #acks=1:即只需要确认leader收到消息
      #acks=all或-1:ISR + Leader都确定收到
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的编解码方法
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #key的编解码方法
      #开启事务,但是要求ack为all,否则无法保证幂等性
      #transaction-id-prefix: "COLA_TX"
      #额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
      properties:
        #自定义拦截器,注意,这里结尾时classes(先于分区器,快递先贴了标签再指定地址)
        interceptor.classes: cn.com.controller.TimeInterceptor
        #自定义分区器
        #partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
        #即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
        linger.ms: 1000
        #最大请求大小,200M = 200*1024*1024
        max.request.size: 209715200
        #Producer.send()方法的最大阻塞时间(115秒)
        max.block.ms: 115000
        #该配置控制客户端等待请求响应的最长时间。
        #如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。 
        #此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
        request.timeout.ms: 115000
        #等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
        #如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
        delivery.timeout.ms: 120000


    #https://kafka.apache.org/documentation/#consumerconfigs
    consumer:
      bootstrap-servers: localhost:9092
      group-id: kafka-group01 #消费者组
      auto-offset-reset: earliest #消费方式: earliest:从头开始消费   latest:从最新的开始消费   默认latest
      enable-auto-commit: false #是否自动提交偏移量offset
      auto-commit-interval: 1S #前提是 enable-auto-commit=true。自动提交的频率
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 2
      properties:
        #如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
        session.timeout.ms: 120000
        #最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
        max.poll.interval.ms: 300000
        #配置控制客户端等待请求响应的最长时间。 
        #如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
        #或者如果重试次数用尽,则请求失败。
        request.timeout.ms: 60000
        # 服务器返回的最大数据量,不能超过admin的message.max.bytes单条数据最大大小
        max.partition.fetch.bytes: 1048576
        #订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
        allow.auto.create.topics: true

    listener:
      #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
      #manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
      #type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
      type: batch
      concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲

    template:
      default-topic: "test"

3、创建主题并新增主题定义类
打开cmd窗口,进入安装目录,输入以下命令执行创建topic命令:
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --topic test

主题定义类MQTopic内容如下:

public interface MQTopic {

    /**
     * test主题
     */
    String TEST = "test";
}

4、新增生产者发送消息
生产者类KafkaProducer内容如下:

@RestController
@RequestMapping("kafka/producer")
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @RequestMapping("/send")
    public String sendMessage(String message) {
        kafkaTemplate.send(MQTopic.TEST, message);
        return "ok";
    }

    @RequestMapping("/send/callback")
    public String sendMessageCallback(String message) {
        kafkaTemplate.send(MQTopic.TEST, message).addCallback(success -> {
            String topic = success.getRecordMetadata().topic();
            int partition = success.getRecordMetadata().partition();
            long offset = success.getRecordMetadata().offset();
            System.out.println(String.format("发送成功,Topic=%s, 分区=%s, offset=%s",
                    topic, partition, offset));
        }, failure -> {
            System.out.println("发送失败:" + failure.getMessage());
        });
        return "send callback ok";
    }

    @RequestMapping("/send/future/callback")
    public String sendMessageFutureCallback(String message) {
        kafkaTemplate.send(MQTopic.TEST, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                String topic = result.getRecordMetadata().topic();
                int partition = result.getRecordMetadata().partition();
                long offset = result.getRecordMetadata().offset();
                System.out.println(String.format("发送成功,Topic=%s, 分区=%s, offset=%s",
                        topic, partition, offset));
            }
        });
        return "send future callback ok";
    }
}

5、新增消费者处理类
消费者处理类KafkaConsumer内容如下:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {MQTopic.TEST})
    public void onMessage(ConsumerRecord<?, ?> record) {
        System.out.println(String.format("消费消息:Topic=%s, 分区=%s, 消息内容=%s",
                record.topic(), record.partition(), record.value()));
    }
}

6、测试

启动项目,测试kafka发送及接收消息;

(1)简单生产者测试:
postman请求接口:
http://localhost:8080/kafka/producer/send?message=100

控制台打印日志:

消费消息:Topic=test, 分区=0, 消息内容=100

(2)带回调方法的生产者测试:
postman请求接口:
http://localhost:8080/kafka/producer/send/callback?message=101

控制台打印日志:

发送成功,Topic=test, 分区=0, offset=26
消费消息:Topic=test, 分区=0, 消息内容=101

可以看到,生产者发送消息成功后,进入成功回调方法打印发送成功日志;

(3)全局回调测试

新增Kafka生产者全局回调处理类KafkaSendResultHandler,具体内容如下:

@Component
public class KafkaSendResultHandler implements ProducerListener {

    private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);


    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        log.info("Message send success: " + producerRecord.toString());
    }

    @Override
    public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
        log.info("Message send error: " + producerRecord.toString());
    }
}

新增kafka全局回调测试类:

@SpringBootTest
public class KafkaListenerTest {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Autowired
    private KafkaSendResultHandler producerListener;

    @Test
    public void sendListenerTest() throws InterruptedException {
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.send("test", "data_102");
        Thread.sleep(1000L);
    }
}

运行测试类sendListenerTest方法:

测试类日志窗口日志:

...KafkaSendResultHandler: Message send success: ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=data_102, timestamp=null)

消费者窗口日志:

消费消息:Topic=test, 分区=0, 消息内容=data_102

(完)

添加新评论