新网创想网站建设,新征程启航

为企业提供网站建设、域名注册、服务器等服务

springboot与kafka集成的示例代码

新建spring boot项目

湖滨网站建设公司创新互联公司,湖滨网站设计制作,有大型网站制作公司丰富经验。已为湖滨上1000家提供企业网站建设服务。企业网站搭建\成都外贸网站制作要多少钱,请找那个售后服务好的湖滨做网站的公司定做!

这里使用intellij IDEA

spring boot 与kafka集成的示例代码

spring boot 与kafka集成的示例代码

spring boot 与kafka集成的示例代码

spring boot 与kafka集成的示例代码

添加kafka集成maven

<?xml version="1.0" encoding="UTF-8"?>

  4.0.0

  com.example
  demo
  0.0.1-SNAPSHOT
  jar

  demo
  Demo project for Spring Boot

  
    org.springframework.boot
    spring-boot-starter-parent
    1.5.8.RELEASE
     
  

  
    UTF-8
    UTF-8
    1.8
  

  
    
      org.springframework.boot
      spring-boot-starter-web
    

    
      org.springframework.kafka
      spring-kafka
    

    
      org.springframework.boot
      spring-boot-starter-test
      test
    
  

  
    
      
        org.springframework.boot
        spring-boot-maven-plugin
      
    
  

项目中application.properties 添加

spring.kafka.bootstrap-servers=vm208:9092,vm:9092,vm50:9092
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.group-id=local_test
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1

新建KafkaConsumer消费类

package com.example.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {

  private Logger logger = LoggerFactory.getLogger(this.getClass());

  @KafkaListener(topics = {"test"})
  public void listen(ConsumerRecord<?, ?> record) {
    System.out.printf("offset = %d,key =%s,value=%s\n", record.offset(), record.key(), record.value());
  }
}

启动spring-boot程序,在kafka集群,模拟发送topic,检验接收

复制代码 代码如下:
bin/kafka-console-producer.sh --broker-list    vm208:9092,vm210:9092,vm50:9092  --topic  test

编写producer代码

package com.example.demo.producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {

  @Autowired
  private KafkaTemplate kafkaTemplate;
  String topic="test";
  public void sendMessage(String key,String data){
    kafkaTemplate.send(new ProducerRecord(topic,key,data));
  }
}

建立一个restful模拟发送( //http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)

package com.example.demo.controller;
import com.example.demo.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
  @Autowired
  private KafkaProducer kafkaProducer;
  @RequestMapping(value = "/kafka/send.do", method = RequestMethod.GET)
  public String sendMessage(@RequestParam(value = "key") String key, @RequestParam(value = "data") String data) {
    kafkaProducer.sendMessage(key, data);
    return "sucess";
  }
}

可以发现 spring-kafka大大减少了代码工作量.

官方文档: https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持创新互联。


本文标题:springboot与kafka集成的示例代码
标题网址:http://wjwzjz.com/article/igggep.html
在线咨询
服务热线
服务热线:028-86922220
TOP