程序员人生 网站导航

【Flume】从入口Application来分析flume的source和sink是如何与channel交互的

栏目:互联网时间:2015-03-01 00:54:22

大家在启动flume的时候,输入的命令就能够看出flume的启动入口了

[root@com21 apache-flume⑴.5.2-bin]# sh bin/flume-ng agent -c conf -f conf/server.conf -n a1 Info: Sourcing environment configuration script /home/flume/apache-flume⑴.5.2-bin/conf/flume-env.sh + exec /home/flume/jdk1.7.0_71/bin/java -server -Xms2048m -Xmx2048m -Xss256K -XX:PermSize=32M -XX:MaxPermSize=512M -XX:+UseConcMarkSweepGC -XX:+DisableExplicitGC -XX:+UseParNewGC -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+UseCompressedOops -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:SurvivorRatio=8 -cp '/home/flume/apache-flume⑴.5.2-bin/conf:/home/flume/apache-flume⑴.5.2-bin/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/server.conf -n a1

从这里可以看出flume的启动入口是:org.apache.flume.node.Application

下面我们就来看该入口程序是如何来运行的

找到main函数

附:flume每次启动都会先判断有无与当前配置的3大组件同名的组件存在,存在的话先停掉该组件,顺序为source,sink,channel

其次是启动所有当前配置的组件,启动顺序为channel,sink,source

通过这个启动停止的顺序可以看出flume也是对数据1致性做了保证的。

if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); application.handleConfigurationEvent(configurationProvider.getConfiguration()); }
这个if的作用就是是不是30秒读1下配置,判断是不是有更新

主要看1下对配置内容的处理,两个分支虽然从代码上看不1样,但是处理的逻辑是1样的

我们看else分支的代码吧:

看configurationProvider.getConfiguration()

public MaterializedConfiguration getConfiguration() { MaterializedConfiguration conf = new SimpleMaterializedConfiguration(); FlumeConfiguration fconfig = getFlumeConfiguration(); AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName()); if (agentConf != null) { Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap(); Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap(); Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap(); try { loadChannels(agentConf, channelComponentMap); loadSources(agentConf, channelComponentMap, sourceRunnerMap); loadSinks(agentConf, channelComponentMap, sinkRunnerMap); Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet()); for(String channelName : channelNames) { ChannelComponent channelComponent = channelComponentMap. get(channelName); if(channelComponent.components.isEmpty()) { LOGGER.warn(String.format("Channel %s has no components connected" + " and has been removed.", channelName)); channelComponentMap.remove(channelName); Map<String, Channel> nameChannelMap = channelCache. get(channelComponent.channel.getClass()); if(nameChannelMap != null) { nameChannelMap.remove(channelName); } } else { LOGGER.info(String.format("Channel %s connected to %s", channelName, channelComponent.components.toString())); conf.addChannel(channelName, channelComponent.channel); } } for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); } for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); } } catch (InstantiationException ex) { LOGGER.error("Failed to instantiate component", ex); } finally { channelComponentMap.clear(); sourceRunnerMap.clear(); sinkRunnerMap.clear(); } } else { LOGGER.warn("No configuration found for this host:{}", getAgentName()); } return conf; }

看loadXXX方法 

我们看在载入source组件的时候有个方法: SourceRunner.forSource(source)

public static SourceRunner forSource(Source source) { SourceRunner runner = null; if (source instanceof PollableSource) { runner = new PollableSourceRunner(); ((PollableSourceRunner) runner).setSource((PollableSource) source); } else if (source instanceof EventDrivenSource) { runner = new EventDrivenSourceRunner(); ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source); } else { throw new IllegalArgumentException("No known runner type for source " + source); } return runner; }
这个方法里面通过对source的类型判断来选择使用哪一种SourceRunner,并且给sourcerunner中的对象source赋值了setter

我们来看1个具体例子吧AvroSource,它是事件驱动类型的source――EventDrivenSourceRunner

public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); lifecycleState = LifecycleState.START; }

该方法

1、获得前面赋值的source对象

2、setter通道处理器,那末通道处理器是在哪里赋值的呢?

在loadSources方法中

source.setChannelProcessor(channelProcessor);
这个channelProcessor对象是前面两行

ChannelProcessor channelProcessor = new ChannelProcessor(selector);
来初始化的,初始化时用到的selector对象
ChannelSelector selector = ChannelSelectorFactory.create( sourceChannels, selectorConfig);
通过这行来实例化的,再往上就是触及到读取配置了,请自行查看。

3、再来看第3行cp.initialize()这个是初始化,进去看看:

public void initialize() { interceptorChain.initialize(); }

4、再进去

public void initialize() { Iterator<Interceptor> iter = interceptors.iterator(); while (iter.hasNext()) { Interceptor interceptor = iter.next(); interceptor.initialize(); } }
看到这里,我们知道了,拦截器是在这里开始起作用的,它是处在source和channel中间的1个环节
再后来就是source.start()了,启动source组件了,这就调用到具体某个source的start方法了

最后将该组件的生命周期状态标识为START。


这个方法LifecycleAware类会来调的

那末甚么时候来调呢?1旦调用这个方法,source与channel的交互就开始了

switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start();
上面的代码出现在LifecycleSupervisor类中的内部静态类MonitorRunnable的run方法中,再来看这个线程类谁来调用?

MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future);

在LifecycleSupervisor类中supervise方法

至于这个monitorRunnable干了甚么,我们来看它的run方法就好了,由于它是1个线程嘛

public void run() { logger.debug("checking process:{} supervisoree:{}", lifecycleAware, supervisoree); long now = System.currentTimeMillis(); try { if (supervisoree.status.firstSeen == null) { logger.debug("first time seeing {}", lifecycleAware); supervisoree.status.firstSeen = now; } supervisoree.status.lastSeen = now; synchronized (lifecycleAware) { if (supervisoree.status.discard) { // Unsupervise has already been called on this. logger.info("Component has already been stopped {}", lifecycleAware); return; } else if (supervisoree.status.error) { logger.info("Component {} is in error state, and Flume will not" + "attempt to change its state", lifecycleAware); return; } supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState)) { logger.debug("Want to transition {} from {} to {} (failures:{})", new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); switch (supervisoree.status.desiredState) { case START: try { lifecycleAware.start(); } catch (Throwable e) { logger.error("Unable to start " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { // This component can never recover, shut it down. supervisoree.status.desiredState = LifecycleState.STOP; try { lifecycleAware.stop(); logger.warn("Component {} stopped, since it could not be" + "successfully started due to missing dependencies", lifecycleAware); } catch (Throwable e1) { logger.error("Unsuccessful attempt to " + "shutdown component: {} due to missing dependencies." + " Please shutdown the agent" + "or disable this component, or the agent will be" + "in an undefined state.", e1); supervisoree.status.error = true; if (e1 instanceof Error) { throw (Error) e1; } // Set the state to stop, so that the conf poller can // proceed. } } supervisoree.status.failures++; } break; case STOP: try { lifecycleAware.stop(); } catch (Throwable e) { logger.error("Unable to stop " + lifecycleAware + " - Exception follows.", e); if (e instanceof Error) { throw (Error) e; } supervisoree.status.failures++; } break; default: logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState); } if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { logger.error( "Policy {} of {} has been violated - supervisor should exit!", supervisoree.policy, lifecycleAware); } } } } catch(Throwable t) { logger.error("Unexpected error", t); } logger.debug("Status check complete"); }
我们可以看到里面的核心就是通过判断生命周期的状态是不是是START,如果是,就履行lifecycleAware的start方法,而start方法的具体实现就是flume具体组件的start方法,一样我们以AvroSource为例来看

server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), socketChannelFactory, pipelineFactory, null); connectionCountUpdater = Executors.newSingleThreadScheduledExecutor(); server.start();
从这段代码来看,start方法,启动了1个NettyServer服务端,这个服务端启动完以后就阻塞在这,1旦客户端有数据过来,就处理了,也就是事件驱动了。

从这里我们终究看到核心中的核心了,也就是每隔3秒,source会和channel交互1次,条件是状态START

这面的monitor线程对象会被方法supervise调用

而supervise方法又会被Application入口的 start方法调用

public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } }
这样的话,全部链就串起来了
所以从这里看出来source和channel的交互频率是3秒


看完source和channel的交互,再来看sink和channel的交互

到这里再看sink就很简单了,由于flume中3大组件都实现自接口LifecycleAware

所以从flume的入口Application来看,从start开始终究都是到LifecycleSupervisor类的supervise方法,而该方法一样:

MonitorRunnable monitorRunnable = new MonitorRunnable(); monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future);
这串逻辑,不分具体的source,sink,一样是3秒履行1次。


至此,flume中3大组件的交互和交互频率就说完了,望各位网友不吝指教!!





------分隔线----------------------------
------分隔线----------------------------

最新技术推荐