新网创想网站建设,新征程启航

为企业提供网站建设、域名注册、服务器等服务

如何使用KafkaAPI-ProducerAPI

这篇文章主要讲解了“如何使用KafkaAPI-ProducerAPI”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何使用KafkaAPI-ProducerAPI”吧!

十载专注成都网站制作,成都定制网页设计,个人网站制作服务,为大家分享网站制作知识、方案,网站设计流程、步骤,成功服务上千家企业。为您提供网站建设,网站制作,网页设计及定制高端网站建设服务,专注于成都定制网页设计,高端网页制作,对搬家公司等多个领域,拥有丰富的网站营销经验。

1.消息发送流程

    Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

如何使用KafkaAPI-ProducerAPI

相关参数:
batch.size: 只有数据积累到 batch.size 之后, sender 才会发送数据。
linger.ms: 如果数据迟迟未达到 batch.size, sender 等待 linger.time 之后就会发送数据。

2.异步发送API

1)导入依赖
 

        
        
            org.apache.kafka
            kafka-clients
            2.7.0
        

2)编写代码

需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

2.1). 不带回调函数的API
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        //生产者配置信息可以从ProducerConfig中取Key
         //1.创建kafka生产者的配置信息
        Properties properties=new Properties();
        //2.指定连接的kafka集群
        //properties.put("bootstrap.servers","192.168.1.106:9091");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091");
        //3.ACK应答级别
        //properties.put("acks","all");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        //4.重试次数
        //properties.put("retries",3);
        properties.put(ProducerConfig.RETRIES_CONFIG,3);
        //5.批次大小 16k
        //properties.put("batch.size",16384);
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //6.等待时间
        //properties.put("linger.ms",1);
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //7.RecordAccumulator 缓冲区大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //properties.put("buffer.memory",33554432);
        //8.Key,Value 的序列化类
        //properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //9.创建生产者对象
        KafkaProducer producer = new KafkaProducer<>(properties);
        //10.发送数据
        for (int i = 0; i < 10; i++) {
            ProducerRecord producerRecord = new ProducerRecord<>("first","atguigu--"+i);
            producer.send(producerRecord);
        }
        //11.关闭资源
        producer.close();
    }
}
2.2) 带回调函数的API

    回调函数会在 producer 收到 ack 时调用,为异步调用, 该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
    注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CallBackProducer {
    public static void main(String[] args) {
        //生产者配置信息可以从ProducerConfig中取Key
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //创建生产者对象
        KafkaProducer producer = new KafkaProducer<>(properties);
        /*创建topic
        /opt/kafka/kafka03/bin/kafka-topics.sh  --create --zookeeper 192.168.1.106:2181,192.168.1.106:2182,192.168.1.106:2183  --replication-factor 3  --partitions 2 --topic aaa
        * */

        //发送数据
        for (int i = 0; i < 10; i++) {
            ProducerRecord producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i);
            producer.send(producerRecord, (recordMetadata, e) -> {
                if (e==null){
                    System.out.println("aaa  "+recordMetadata.partition()+ "--"+recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            });
        }
        //11.关闭资源
        producer.close();
    }
}

3.    同步发送API

    同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回 ack。由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可、

        //10.发送数据
        for (int i = 0; i < 10; i++) {
            ProducerRecord producerRecord = new ProducerRecord<>("first","atguigu--"+i);
            producer.send(producerRecord).get();
        }

4.自定义分区器

默认分区策略源码:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

1.1.    自定义分区器代码:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /*自定义分区规则*/
        List partitionInfos = cluster.availablePartitionsForTopic(topic);
        Integer integer =partitionInfos.size();
        return key.toString().hashCode()%integer;
        /*指定分区*/
       /* return 1;*/
    }
    @Override
    public void close() {
    }

    @Override
    public void configure(Map map) {
    }
}

1.2.    生产者使用自定义分区器

//配置方法
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner");

完整代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionProducer {
    public static void main(String[] args) {
        //生产者配置信息可以从ProducerConfig中取Key
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.106:9091,192.168.1.106:9092,192.168.1.106:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //配置分区器的全类名 partitioner.class
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zhl.kafkademo.partitioner.MyPartitioner");
        //创建生产者对象
        KafkaProducer producer = new KafkaProducer<>(properties);

        //发送数据
        for (int i = 0; i < 10; i++) {
            ProducerRecord producerRecord = new ProducerRecord<>("bbb","d","bbb-atguigu++"+i);
            producer.send(producerRecord, (recordMetadata, e) -> {
                if (e==null){
                    System.out.println(recordMetadata.topic()+"--"+ recordMetadata.partition()+ "--"+recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            });
        }
        //11.关闭资源
        producer.close();
    }
}

感谢各位的阅读,以上就是“如何使用KafkaAPI-ProducerAPI”的内容了,经过本文的学习后,相信大家对如何使用KafkaAPI-ProducerAPI这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


分享题目:如何使用KafkaAPI-ProducerAPI
URL标题:http://wjwzjz.com/article/geigph.html
在线咨询
服务热线
服务热线:028-86922220
TOP