大家在启动flume的时候,输入的命令就能够看出flume的启动入口了
[root@com21 apache-flume⑴.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1
Info: Sourcing environment configuration script /home/flume/apache-flume⑴.5.2-bin/conf/flume-env.sh
+ exec /home/flume/jdk1.7.0_71/bin/java -server -Xms2048m -Xmx2048m -Xss256K -XX:PermSize=32M -XX:MaxPermSize=512M -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedOops -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:SurvivorRatio=8 -cp '/home/flume/apache-flume⑴.5.2-bin/conf:/home/flume/apache-flume⑴.5.2-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/server.conf -n a1
从这里可以看出flume的启动入口是:org.apache.flume.node.Application
下面我们就来看该入口程序是如何来运行的
找到main函数
附:flume每次启动都会先判断有无与当前配置的3大组件同名的组件存在,存在的话先停掉该组件,顺序为source,sink,channel
其次是启动所有当前配置的组件,启动顺序为channel,sink,source
通过这个启动停止的顺序可以看出flume也是对数据1致性做了保证的。
if(reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(agentName,
configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
} else {
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
这个if的作用就是是不是30秒读1下配置,判断是不是有更新
主要看1下对配置内容的处理,两个分支虽然从代码上看不1样,但是处理的逻辑是1样的
我们看else分支的代码吧:
看configurationProvider.getConfiguration()
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames =
new HashSet<String>(channelComponentMap.keySet());
for(String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.
get(channelName);
if(channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap = channelCache.
get(channelComponent.channel.getClass());
if(nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
看loadXXX方法
我们看在载入source组件的时候有个方法: SourceRunner.forSource(source)
public static SourceRunner forSource(Source source) {
SourceRunner runner = null;
if (source instanceof PollableSource) {
runner = new PollableSourceRunner();
((PollableSourceRunner) runner).setSource((PollableSource) source);
} else if (source instanceof EventDrivenSource) {
runner = new EventDrivenSourceRunner();
((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
} else {
throw new IllegalArgumentException("No known runner type for source "
+ source);
}
return runner;
}
这个方法里面通过对source的类型判断来选择使用哪一种SourceRunner,
并且给sourcerunner中的对象source赋值了setter
我们来看1个具体例子吧AvroSource,它是事件驱动类型的source――EventDrivenSourceRunner
public void start() {
Source source = getSource();
ChannelProcessor cp = source.getChannelProcessor();
cp.initialize();
source.start();
lifecycleState = LifecycleState.START;
}
该方法
1、获得前面赋值的source对象
2、setter通道处理器,那末通道处理器是在哪里赋值的呢?
在loadSources方法中
source.setChannelProcessor(channelProcessor);
这个channelProcessor对象是前面两行
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
来初始化的,初始化时用到的selector对象
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
通过这行来实例化的,再往上就是触及到读取配置了,请自行查看。
3、再来看第3行cp.initialize()这个是初始化,进去看看:
public void initialize() {
interceptorChain.initialize();
}
4、再进去
public void initialize() {
Iterator<Interceptor> iter = interceptors.iterator();
while (iter.hasNext()) {
Interceptor interceptor = iter.next();
interceptor.initialize();
}
}
看到这里,我们知道了,拦截器是在这里开始起作用的,它是处在source和channel中间的1个环节
再后来就是source.start()了,启动source组件了,这就调用到具体某个source的start方法了
最后将该组件的生命周期状态标识为START。
这个方法LifecycleAware类会来调的
那末甚么时候来调呢?1旦调用这个方法,source与channel的交互就开始了
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
上面的代码出现在LifecycleSupervisor类中的内部静态类MonitorRunnable的run方法中,再来看这个线程类谁来调用?
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
在LifecycleSupervisor类中supervise方法
至于这个monitorRunnable干了甚么,我们来看它的run方法就好了,由于它是1个线程嘛
public void run() {
logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
supervisoree);
long now = System.currentTimeMillis();
try {
if (supervisoree.status.firstSeen == null) {
logger.debug("first time seeing {}", lifecycleAware);
supervisoree.status.firstSeen = now;
}
supervisoree.status.lastSeen = now;
synchronized (lifecycleAware) {
if (supervisoree.status.discard) {
// Unsupervise has already been called on this.
logger.info("Component has already been stopped {}", lifecycleAware);
return;
} else if (supervisoree.status.error) {
logger.info("Component {} is in error state, and Flume will not"
+ "attempt to change its state", lifecycleAware);
return;
}
supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
logger.debug("Want to transition {} from {} to {} (failures:{})",
new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
} catch (Throwable e) {
logger.error("Unable to start " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
// This component can never recover, shut it down.
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
logger.warn("Component {} stopped, since it could not be"
+ "successfully started due to missing dependencies",
lifecycleAware);
} catch (Throwable e1) {
logger.error("Unsuccessful attempt to "
+ "shutdown component: {} due to missing dependencies."
+ " Please shutdown the agent"
+ "or disable this component, or the agent will be"
+ "in an undefined state.", e1);
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
// Set the state to stop, so that the conf poller can
// proceed.
}
}
supervisoree.status.failures++;
}
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) {
logger.error("Unable to stop " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state",
supervisoree.status.desiredState);
}
if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
logger.error(
"Policy {} of {} has been violated - supervisor should exit!",
supervisoree.policy, lifecycleAware);
}
}
}
} catch(Throwable t) {
logger.error("Unexpected error", t);
}
logger.debug("Status check complete");
}
我们可以看到里面的核心就是通过判断生命周期的状态是不是是START,如果是,就履行lifecycleAware的start方法,而start方法的具体实现就是flume具体组件的start方法,一样我们以AvroSource为例来看
server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
socketChannelFactory, pipelineFactory, null);
connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
server.start();
从这段代码来看,start方法,启动了1个NettyServer服务端,这个服务端启动完以后就阻塞在这,1旦客户端有数据过来,就处理了,也就是事件驱动了。
从这里我们终究看到核心中的核心了,也就是每隔3秒,source会和channel交互1次,条件是状态START
这面的monitor线程对象会被方法supervise调用
而supervise方法又会被Application入口的 start方法调用
public synchronized void start() {
for(LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
}
这样的话,全部链就串起来了
所以从这里看出来source和channel的交互频率是3秒
看完source和channel的交互,再来看sink和channel的交互
到这里再看sink就很简单了,由于flume中3大组件都实现自接口LifecycleAware
所以从flume的入口Application来看,从start开始终究都是到LifecycleSupervisor类的supervise方法,而该方法一样:
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
这串逻辑,不分具体的source,sink,一样是3秒履行1次。
至此,flume中3大组件的交互和交互频率就说完了,望各位网友不吝指教!!