新网创想网站建设,新征程启航

为企业提供网站建设、域名注册、服务器等服务

kafkahigh-levelconsumer多线程访

在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。

公司主营业务:成都做网站、成都网站设计、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联建站是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联建站推出太子河免费做网站回馈大家。

 

 

Java代码  kafka high-level consumer 多线程访

  1.  def hasNext(): Boolean = {  

  2.     if(state == FAILED)         //处于FAILED状态时,另外线程访问会直接异常  

  3.       throw new IllegalStateException("Iterator is in failed state")  

  4.     state match {  

  5.       case DONE => false  

  6.       case READY => true  

  7.       case _ => maybeComputeNext()  

  8.     }  

  9.   }  

  10.   

  11.   

  12.   def maybeComputeNext(): Boolean = {  

  13.     state = FAILED              //重置了状态  

  14.     nextItem = Some(makeNext())          

  15.     if(state == DONE) {  

  16.       false  

  17.     } else {  

  18.       state = READY  

  19.       true  

  20.     }  

  21.   }  

  22.   下载

  23.   

  24. protected def makeNext(): MessageAndMetadata[K, V] = {  

  25.     var currentDataChunk: FetchedDataChunk = null  

  26.     // if we don't have an iterator, get one  

  27.     var localCurrent = current.get()  

  28.     if(localCurrent == null || !localCurrent.hasNext) {  

  29.       if (consumerTimeoutMs < 0)  

  30.         currentDataChunk = channel.take             //channel是BlockingQueue这里会阻塞  

  31.   

  32.       else {  

  33.         currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)  

  34.         if (currentDataChunk == null) {  

  35.           // reset state to make the iterator re-iterable  

  36.           resetState()  

  37.           throw new ConsumerTimeoutException  

  38.         }  

  39.       }  

  40. //省略部分代码  

  41. }  


分享题目:kafkahigh-levelconsumer多线程访
链接URL:http://wjwzjz.com/article/gjcggo.html
在线咨询
服务热线
服务热线:028-86922220
TOP