新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章将为大家详细讲解有关flume中hdfssink如何自定义EventSerializer序列化类,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
创新互联公司坚持“要么做到,要么别承诺”的工作理念,服务领域包括:网站建设、网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的巴彦网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
因为之前做了hbasesink的序列化类,觉得写hdfs的应该会很简单,可是没想到竟然不一样。hdfs并没有直接配置序列化类的选项需要根据fileType来选择对相应序列化类,我们使用的datastream的类型,对应的类是HDFSDataStream,这个类默认的序列化类TEXT(这是个枚举类型)
serializerType = context.getString("serializer", "TEXT");
枚举的类如下:
public enum EventSerializerType { TEXT(BodyTextEventSerializer.Builder.class), HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class), AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class), CUSTOM(CUSTOMEventSerializer.Builder.class),//自定义的序列化类 OTHER(null); private final Class extends EventSerializer.Builder> builderClass; EventSerializerType(Class extends EventSerializer.Builder> builderClass) { this.builderClass = builderClass; } public Class extends EventSerializer.Builder> getBuilderClass() { return builderClass; } }
在里面加了自定义的类型和枚举,在配置agent的时候配置好filetype和serializer即可,同样需要编译上传。
自定义的序列化类如下:
public class CUSTOMEventSerializer implements EventSerializer { private final static Logger logger = LoggerFactory.getLogger(CUSTOMEventSerializer.class); private final String SPLITCHAR = "\001";//列分隔符 // for legacy reasons, by default, append a newline to each event written // out private final String APPEND_NEWLINE = "appendNewline"; private final boolean APPEND_NEWLINE_DFLT = true; private final OutputStream out; private final boolean appendNewline; private CUSTOMEventSerializer(OutputStream out, Context ctx) { this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT); this.out = out; } @Override public boolean supportsReopen() { return true; } @Override public void afterCreate() { // noop } @Override public void afterReopen() { // noop } @Override public void beforeClose() { // noop } @Override public void write(Event e) throws IOException { // 获取日志信息 String log = new String(e.getBody(), StandardCharsets.UTF_8); logger.info("-----------logs-------" + log); // headers包含日志中项目编号和host信息 Mapheaders = e.getHeaders(); String parsedLog = parseJson2Value(log, headers); out.write(parsedLog.getBytes()); logger.info("-----------values-------" + parsedLog); logger.info("-----------valueSSSSSS-------" + parsedLog.getBytes()); out.write('\n'); } /** * * @Title: parseJson2Value * @Description: 解析出json日志中的value。 * @param log json格式日志 * @param headers event头信息 * @return * @return String 解析后的日志 * @throws */ private String parseJson2Value(String log, Map headers) { log.replace("\\", "/"); String time = ""; String path = ""; Object value = ""; StringBuilder values = new StringBuilder(); ObjectMapper objectMapper = new ObjectMapper(); try { Map m = objectMapper.readValue(log, Map.class); for(String key:m.keySet()){ value = m.get(key); if (key.equals("uri")){ //解析访问路径 path = pasreUriToPath(value.toString()); } if(key.equals("time")){ time = value.toString().substring(10); } values.append(value).append(this.SPLITCHAR); } } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // 解析headers中的项目编号和服务host String pcode = headers.get("pcode"); String host = headers.get("host"); values.append(path).append(this.SPLITCHAR). append(pcode).append(this.SPLITCHAR). append(host).append(this.SPLITCHAR). append(time).append(this.SPLITCHAR); //value字符串 return values.toString(); } @Override public void flush() throws IOException { // noop } public static class Builder implements EventSerializer.Builder { @Override public EventSerializer build(Context context, OutputStream out) { CUSTOMEventSerializer s = new CUSTOMEventSerializer(out, context); return s; } } /** * 把请求uri转换成具体的访问路径 * * @param uri 请求uri * @return 访问路径 */ protected String pasreUriToPath(String uri){ if(uri == null || "".equals(uri.trim())){ return uri; } int index = uri.indexOf("/"); if(index > -1){ uri = uri.substring(index); } index = uri.indexOf("?"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(";"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf(" HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } index = uri.indexOf("HTTP/1.1"); if(index > -1){ uri = uri.substring(0, index); } return uri; } }
关于“flume中hdfssink如何自定义EventSerializer序列化类”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。