本文将以TimestampInterceptor为例来分析1下flume中拦截器的工作原理
首先来看下改拦截器的实现结构
1、实现了Interceptor接口
该接口的方法定义以下:
public void initialize();
public Event intercept(Event event);
public List<Event> intercept(List<Event> events);
public void close();
/** Builder implementations MUST have a no-arg constructor */
public interface Builder extends Configurable {
public Interceptor build();
}
2、接口中定义了1个内部接口Builder
该接口又继承自Configurable接口【接口只能继承接口,不能实现接口】
该接口的方法定义以下:
public void configure(Context context);
该方法很容易理解,就是用来读取flume的配置文件内容的。
下面来看TimestampInterceptor的具体实现
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
@Override
public Interceptor build() {
return new TimestampInterceptor(preserveExisting);
}
@Override
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}
}
该内部类实现了Interceptor的接口Builder,必须得有1个无参的构造方法,通过该构造方法就实例化了1个拦截器对象
并且在configure方法中读取了preserveExisting属性,默许值为false
该配置的作用表明
This interceptor can preserve an existing timestamp if it is already present in the configuration.
再来看intercept方法
批量的
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
简单的循环调用了intercept对event逐1处理
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(TIMESTAMP)) {
// we must preserve the existing timestamp
} else {
long now = System.currentTimeMillis();
headers.put(TIMESTAMP, Long.toString(now));
}
return event;
}
该方法即拦截器的核心内容
1、如果拿到的event的header中本身包括timestamp这个key并且预留保存属性为true,我们就直接返回该event就好了
2、否则的话,我们生成1个时间戳,并将这个时间戳放到event的header中,作为1个属性保存,再返回给event
综上所述:
Flume中拦截器的作用就是对event中header的部份可以按需塞入1些属性,固然你如果想要处理event的body内容,也是可以的,但是event的body内容是系统下游阶段真正处理的内容,如果让Flume来修饰body的内容的话,那就是强耦合了,这就背背了当初使用Flume来解耦的初衷了。
做法可以有,但是合不适合是另外一回事了!!!!balance1直是1个世界性困难!!!