新网创想网站建设,新征程启航

为企业提供网站建设、域名注册、服务器等服务

KafkaJavaProducer代码实例的详细解析

这篇文章主要讲解了Kafka Java Producer代码实例的详细解析,内容清晰明了,对此有兴趣的小伙伴可以学习一下,相信大家阅读完之后会有帮助。

网站设计制作过程拒绝使用模板建站;使用PHP+MYSQL原生开发可交付网站源代码;符合网站优化排名的后台管理系统;网站制作、网站建设收费合理;免费进行网站备案等企业网站建设一条龙服务.我们是一家持续稳定运营了10多年的创新互联网站建设公司。

根据业务需要可以使用Kafka提供的Java Producer API进行产生数据,并将产生的数据发送到Kafka对应Topic的对应分区中,入口类为:Producer

Kafka的Producer API主要提供下列三个方法:

  •   public void send(KeyedMessage message) 发送单条数据到Kafka集群
  •   public void send(List> messages) 发送多条数据(数据集)到Kafka集群
  •   public void close() 关闭Kafka连接资源

一、JavaKafkaProducerPartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到Topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducerPartitioner implements Partitioner {

  /**
   * 无参构造函数
   */
  public JavaKafkaProducerPartitioner() {
    this(new VerifiableProperties());
  }

  /**
   * 构造函数,必须给定
   *
   * @param properties 上下文
   */
  public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
    // nothings
  }

  @Override
  public int partition(Object key, int numPartitions) {
    int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
    return num % numPartitions;
  }
}

二、 JavaKafkaProducer:通过Kafka提供的API进行数据产生操作的测试类;具体代码如下:

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.log4j.Logger;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ThreadLocalRandom;

/**
 * Created by gerry on 12/21.
 */
public class JavaKafkaProducer {
  private Logger logger = Logger.getLogger(JavaKafkaProducer.class);
  public static final String TOPIC_NAME = "test";
  public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
  public static final int chartsLength = charts.length;


  public static void main(String[] args) {
    String brokerList = "192.168.187.149:9092";
    brokerList = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
    brokerList = "192.168.187.146:9092";
    Properties props = new Properties();
    props.put("metadata.broker.list", brokerList);
    /**
     * 0表示不等待结果返回
* 1表示等待至少有一个服务器返回数据接收标识
* -1表示必须接收到所有的服务器返回标识,及同步写入
* */ props.put("request.required.acks", "0"); /** * 内部发送数据是异步还是同步 * sync:同步, 默认 * async:异步 */ props.put("producer.type", "async"); /** * 设置序列化的类 * 可选:kafka.serializer.StringEncoder * 默认:kafka.serializer.DefaultEncoder */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /** * 设置分区类 * 根据key进行数据分区 * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区 * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区 */ props.put("partitioner.class", "JavaKafkaProducerPartitioner"); // 重试次数 props.put("message.send.max.retries", "3"); // 异步提交的时候(async),并发提交的记录数 props.put("batch.num.messages", "200"); // 设置缓冲区大小,默认10KB props.put("send.buffer.bytes", "102400"); // 2. 构建Kafka Producer Configuration上下文 ProducerConfig config = new ProducerConfig(props); // 3. 构建Producer对象 final Producer producer = new Producer(config); // 4. 发送数据到服务器,并发线程发送 final AtomicBoolean flag = new AtomicBoolean(true); int numThreads = 50; ExecutorService pool = Executors.newFixedThreadPool(numThreads); for (int i = 0; i < 5; i++) { pool.submit(new Thread(new Runnable() { @Override public void run() { while (flag.get()) { // 发送数据 KeyedMessage message = generateKeyedMessage(); producer.send(message); System.out.println("发送数据:" + message); // 休眠一下 try { int least = 10; int bound = 100; Thread.sleep(ThreadLocalRandom.current().nextInt(least, bound)); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + " shutdown...."); } }, "Thread-" + i)); } // 5. 等待执行完成 long sleepMillis = 600000; try { Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } flag.set(false); // 6. 关闭资源 pool.shutdown(); try { pool.awaitTermination(6, TimeUnit.SECONDS); } catch (InterruptedException e) { } finally { producer.close(); // 最后之后调用 } } /** * 产生一个消息 * * @return */ private static KeyedMessage generateKeyedMessage() { String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99); StringBuilder sb = new StringBuilder(); int num = ThreadLocalRandom.current().nextInt(1, 5); for (int i = 0; i < num; i++) { sb.append(generateStringMessage(ThreadLocalRandom.current().nextInt(3, 20))).append(" "); } String message = sb.toString().trim(); return new KeyedMessage(TOPIC_NAME, key, message); } /** * 产生一个给定长度的字符串 * * @param numItems * @return */ private static String generateStringMessage(int numItems) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < numItems; i++) { sb.append(charts[ThreadLocalRandom.current().nextInt(chartsLength)]); } return sb.toString(); } }

三、Pom.xml依赖配置如下


  0.8.2.1



  
    org.apache.kafka
    kafka_2.10
    ${kafka.version}
  

看完上述内容,是不是对Kafka Java Producer代码实例的详细解析有进一步的了解,如果还想学习更多内容,欢迎关注创新互联行业资讯频道。


网站栏目:KafkaJavaProducer代码实例的详细解析
当前路径:http://wjwzjz.com/article/jipiei.html
在线咨询
服务热线
服务热线:028-86922220
TOP