新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章给大家介绍kafka-Storm中如何将日志文件打印到local,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
创新互联专业为企业提供雨花台网站建设、雨花台做网站、雨花台网站设计、雨花台网站制作等企业网站建设、网页设计与制作、雨花台企业网站模板建站服务,十多年雨花台做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
阅读前提:
1 : 您可能需要对 logback 日志系统有所了解
2 :您可能需要对于 kafka 有初步的了解
3:请代码查看之前,请您仔细参考系统的业务图解
由于kafka本身自带了和『Hadoop』的接口,如果需要将kafka中的文件直接迁移到HDFS,请参看本ID的另外一篇博文:
业务系统-kafka-Storm【日志本地化】 - 2 :直接通过kafka将日志传递到HDFS
1: 一个正式环境系统的系统设计图解:
通过kafka集群,在2个相同的topic之下,通过kafka-storm, he kafka-hadoop,2 个Consumer,针对同样的一份数据,我们分流了2个管道:
其一: 实时通道
其二:离线通道
在日志本地化的过程之中,前期,由于日志的清洗,过滤的工作是放在Storm集群之中,也就是说,留存到本地locla的日志。是我们在Storm集群之中进行了清洗的数据。
也就是:
如下图所示:
在kafka之中,通常而言,有如下的 代码 用来处理:
在这里我们针对了2种日志,有两个Consumer用来处理
package com.mixbox.kafka.consumer; public class logSave { public static void main(String[] args) throws Exception { Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit); visitlog.start(); Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order); orderlog.start(); } }
在这里,我们依据不同的原始字段,将不同的数据保存到不同的文件之中。
package com.mixbox.kafka.consumer; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * @author Yin Shuai */ public class Consumer_Thread extends Thread { // 在事实上我们会依据传递的topic名称,来生成不桐的记录机器 // private Logger _log_order = LoggerFactory.getLogger("order"); // private Logger _log_visit = LoggerFactory.getLogger("visit"); private Logger _log = null; private final ConsumerConnector _consumer; private final String _topic; public Consumer_Thread(String topic) { _consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); this._topic = topic; _log = LoggerFactory.getLogger(_topic); System.err.println("log的名称" + _topic); } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); // 在这里我们的组ID为logSave props.put("group.id", KafkaProperties.logSave); props.put("zookeeper.session.timeout.ms", "100000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public void run() { MaptopicCountMap = new HashMap (); topicCountMap.put(_topic, new Integer(1)); Map >> consumerMap = _consumer .createMessageStreams(topicCountMap); for (KafkaStream kafkaStream : consumerMap.get(_topic)) { ConsumerIterator iterator = kafkaStream.iterator(); while (iterator.hasNext()) { MessageAndMetadata next = iterator.next(); try { // 在这里我们分拆了一个Consumer 来处理visit日志 logFile(next); System.out.println("message:" + new String(next.message(), "utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } } private void logFile(MessageAndMetadata next) throws UnsupportedEncodingException { _log.info(new String(next.message(), "utf-8")); } }
一个简单的小tips:
logback.xml ,提醒您注意,这里的配置文件太过粗浅。如有需要,请自行填充。
f:/opt/log/test.%d{yyyy-MM-dd}.log %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}-%msg%n e:/logs/error/error.log ERROR ACCEPT DENY e:/logs/yuanshi-%d{yyyy-MM-dd}.log 10 %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}-%msg%n E:\logs\file\file.log INFO ACCEPT DENY e:/logs/venality-%d{yyyy-MM-dd}.log 10 %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}-%msg%n E:\logs\visitlog\visit.log %msg%n INFO E:\logs\visit.log.%d{yyyy-MM-dd} E:\logs\orderlog\order.log %msg%n INFO E:\logs\order.log.%d{yyyy-MM-dd}
关于kafka-Storm中如何将日志文件打印到local就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。