上1篇文章主要论述了HDFS Cache缓存方面的知识,本文继续带领大家了解HDFS内存存储相干的内容.在HDFS中,CacheAdmin设置的目标文件缓存是会寄存于DataNode的内存中,但是另外1种情况也能够将数据寄存在DataNode的内存里.就是之前HDFS异构存储中提到的内存存储策略,LAZY_PERSIST.换句话说,本文也是对HDFS内存存储策略的1个更细致的分析.斟酌到LAZY_PERSIST内存存储与其他存储策略类型的不同的地方,做这样的1个分析还是比较成心义的.
对内存存储,可能很多人会存有这么几种看法,
仔细来看以上这2种观点,其实都有不小的瑕疵.
首先第1个观点,服务1旦停止,内存数据全丢,这个是没法接受的,我们可以忍耐内存中少许的数据丢失,但是全丢就不是特别好的处理方式了.而且这个也有点不公道,内存的存储空间是有限的,如果不及时存储1部份数据,内存空间早晚会耗尽.
然后是第2个观点,第2个方案种是在服务停止退出的时候做持久化操作,但是他一样会面临上面提到的内存空间的限制问题.而且假定机器的内存是足够大的,那末最后写入磁盘的那个阶段想必也不会那末快,由于数据可能会很多.
所以1般的通用的比较好的做法是异步的做持久化,甚么意思呢
内存存储新数据的同时,持久化距离当前时刻最远(存储时间最早)的数据
换1个通俗的解释,好比我有个内存数据块队列,在队列头部不断有新增的数据块插入,就是待存储的块,由于资源有限 ,我要把队列尾部的块,也就是早些时间点的块持久化到磁盘中,然后才有空间腾出来存新的块.然后构成这样的1个循环,新的块加入,老的块移除,保证了整体数据的更新.
HDFS的LAZY_PERSIST内存存储策略用的就是这套方法.下面是1张原理图:
上文描写的原理在图中的表示实际上是4,6,的步骤.写数据的RAM,然后异步的写到Disk.前面几个步骤是如何设置StorageType的操作,这个在下文种会具体提到.所以上图所示的大体步骤可以归纳为以下:
内存的异步持久化存储,就是明显不同于其他介质存储数据的地方.这应当也是LAZY_PERSIST的名称的源由吧,数据不是马上落盘,而是”lazy persisit”怠惰的方式,延时的处理.
这里需要了解1个额外的知识点,Linux 虚拟内存盘.之前我也是1直有个疑惑,内存也能够当作1个块盘使用?内存不就是临时存数据用的吗?因而在学习此模块知识之前,特地查了相干的资料.其实在Linux中,可以用将内存摹拟为1个块盘的技术,叫RAM disk.这是1种摹拟的盘,实质数据都是寄存在内存中的.RAM disk虚拟内存盘可以在某些特定的内存式存储文件系统下结合使用,比如tmpfs,ramfs.关于tmpfsd百度百科链接点此.通过此项技术,我们就能够将机器内存利用起来,作为1个独立的虚拟盘供DataNode使用了.
下面论述的将是本文的核心内容,就是HDFS内存存储的主要进程操作.不要小视这仅仅是1个单1的StoragePolicy,里面的进程可其实不简单,在下面的进程种,我会给出比较多的进程图的展现,帮助大家理解.
要想让文件数据存储到内存中,1开始你要做的操作就是设置此文件的存储策略,就是上面提到的LAZY_PERSIST,而不是使用默许的StoragePolicy.DEFAULT,默许策略的存储介质是DISK类型的.设置存储策略的方法目前有2种:
hdfs storagepolicies -setStoragePolicy -path <path> -policy LAZY_PERSIST
方便,快速.
FSDataOutputStream fos =
fs.create(
path,
FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.LAZY_PERSIST),
bufferLength,
replicationFactor,
blockSize,
null);
上述方式终究调用的是DFSClient的create同名方法,以下:
/**
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
* long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
* set to true.
*/
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, short replication, long blockSize,
Progressable progress, int buffersize, ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
replication, blockSize, progress, buffersize, checksumOpt, null);
}
方法经过RPC层层调用,经过FSNamesystem,终究会到FSDirWriteFileOp的startFile方法,在此方法内部,会有设置的动作
static HdfsFileStatus startFile(
FSNamesystem fsn, FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean logRetryEntry)
throws IOException {
assert fsn.hasWriteLock();
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
// 判断CreateFlag是不是带有LAZY_PERSIST标识,来判断是不是是内存存储策略的
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
...
// 然后在此设置策略
setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
isLazyPersist);
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath);
}
所以这部份的进程调用图以下:
OK,以上就是前期存储策略的设置进程了,这1部份还是非常的直接明了的.
这里直接跳到DataNode如何进行内存式存储,当我们设置了文件为LAZY_PERSIST的存储方式以后.我在下面会进行分模块,分角色的介绍.
在之前的篇幅中已提到过,数据存储的同时会有另外1批数据会被异步的持久化,所以这里1定会触及到多个服务对象的合作.这些服务对象的指挥者是FsDatasetImpl.他是1个掌管DataNode所有磁盘读写数据的管家.
在FsDatasetImpl中,与内存存储相干的服务对象有以下的3个.
下面来1个个介绍:
LazyWriter:lazyWriter是1个线程服务,此线程会不断的循环着从数据块列表中取出数据块,加入到异步持久化线程池RamDiskAsyncLazyPersistService中去履行.
RamDiskAsyncLazyPersistService:此对象就是异步持久化线程服务,里面针对每个磁盘块设置1个对应的线程池,然后需要持久化到给定的磁盘块的数据块会被提交到对应的线程池中去.每一个线程池的最大线程数为1.
RamDiskReplicaLruTracker:副本块跟踪类,此类种保护了所有已持久化,未持久化的副本和总副本数据信息.所以当1个副本被终究存储到内存种后,相应的会有副本所属队列信息的变更.其次当节点内存不足的时候,部份距离最近最久没有被访问的副本块会在此类中被移除.
综合了以上3者的紧密合作,终究实现了HDFS的内存存储.下面是具体的角色介绍.
在以上3者中,RamDiskReplicaLruTracker的角色起到了1个中间人的角色.由于他内部保护了多个关系的数据块信息.主要的就是以下3类.
public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
...
/**
* Map of blockpool ID to <map of blockID to ReplicaInfo>.
*/
Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
/**
* Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
Queue<RamDiskReplicaLru> replicasNotPersisted;
/**
* Map of persisted replicas ordered by their last use times.
*/
TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
...
这里的Queue就是待内存存储队列.以上3个变量之间的关系图以下
RamDiskReplicaLruTracker中的方法操作绝大多数与这3个变量的增删改动相干,所以逻辑其实不复杂,我们只需要了解这些方法有甚么作用便可.我对此分成了2类:
第1类,异步持久化操作相干方法.如图:
当节点重启或有新的文件被设置了LAZY_PERSIST策略后,就会有新的副本块被存储到内存中,同时会加入到replicaNotPersisted队列中.然后经过中间的dequeueNextReplicaToPersist取出下1个将被持久化的副本块,进行写磁盘的操作.recordStartLazyPersist,recordEndLazyPersist这2个方法会在持久化的进程中被调用,标志着持久化状态的变更.
另外一类,异步持久化操作无直接关联方法.如图:
有下面3个方法:
这里反复提到1个名词,LRU,他的全称是Least Recently Used,意为最近最少使用算法,相干链接点此,getNextCandidateForEviction采取此算法的好处是保证了现有副本块的1个活跃度,把最近很久没有访问过的给移除掉.对这个操作,我们有必要了解其中的细节.
先是touch会更新最近访问的时间
synchronized void touch(final String bpid,
final long blockId) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
...
// Reinsert the replica with its new timestamp.
// 更新最近访问时间戳,并重新插入数据
if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
}
然后是第2步获得候选移除块
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
// 获得replicasPersisted迭代器进行遍历
final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
while (it.hasNext()) {
// 由于replicasPersisted已根据时间排好序了,所以取出当前的块进行移除便可
final RamDiskReplicaLru ramDiskReplicaLru = it.next();
it.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
这里比较成心思的是,根据已持久化的块的访问时间来进行挑选移除,而不是直接是内存中的块.最后是在内存中移除与候选块属于同1副本信息的块并释放内存空间.
/**
* Attempt to evict one or more transient block replicas until we
* have at least bytesNeeded bytes free.
*/
public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0;
final long cacheCapacity = cacheManager.getCacheCapacity();
// 当检测到内存空间不满足外界需要的大小时
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
(cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
// 获得待移除副本信息
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Evicting block " + replicaState);
}
...
// 移除内存中的相干块并释放空间
// Delete the block+meta files from RAM disk and release locked
// memory.
removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
blockFileUsed, metaFileUsed, bpid);
}
}
}
LazyWriter是1个线程服务,他是1个发动机,循环不断的从队列中取出待持久化的数据块,提交到异步持久化服务中去.直接来看主要的run方法.
public void run() {
int numSuccessiveFailures = 0;
while (fsRunning && shouldRun) {
try {
// 取出新的副本块并提交到异步服务中,返回是不是提交成功布尔值
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
}
} catch (InterruptedException e) {
LOG.info("LazyWriter was interrupted, exiting");
break;
} catch (Exception e) {
LOG.warn("Ignoring exception in LazyWriter:", e);
}
}
}
进入saveNextReplica方法的处理
private boolean saveNextReplica() {
RamDiskReplica block = null;
FsVolumeReference targetReference;
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
boolean succeeded = false;
try {
// 从队列种取出新的待持久化的块
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
synchronized (FsDatasetImpl.this) {
...
// 提交到异步服务中去
asyncLazyPersistService.submitLazyPersistTask(
block.getBlockPoolId(), block.getBlockId(),
replicaInfo.getGenerationStamp(), block.getCreationTime(),
replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
targetReference);
}
}
}
succeeded = true;
} catch(IOException ioe) {
LOG.warn("Exception saving replica " + block, ioe);
} finally {
if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
}
}
return succeeded;
}
所以LazyWriter线程服务的流程图可以归纳为以下所示:
然后我们结合LazyWriter和RamDiskReplicaTracker跟踪服务,就能够得到下面1个完全的流程(暂且不斟酌RamDiskAsyncLazyPersistService的内部履行逻辑).
最后1部份的异步服务的内容相对就比较简单1些了,主要围绕着Volume磁盘和Executor线程池这2部份的内容.秉承着下面1个原则
1个磁盘服务对应1个线程池,并且1个线程池的最大线程数也只有1个.
线程池列表定义以下
class RamDiskAsyncLazyPersistService {
...
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
...
这里的File代表的是1个独立的磁盘所在目录,个人认为这里完全可以用String字符串替换.既可以减少存储空间,又直观明了.所以在这里就能够看出是1对1的关系了.
当服务启动的时候,就会有新的磁盘目录加入.
synchronized void addVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
// 如果当前已存在此磁盘目录对应的线程池,则跑异常
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
// 否则进行添加
addExecutorForVolume(volume);
}
进入addExecutorForVolume方法
private void addExecutorForVolume(final File volume) {
...
// 新建线程池,最大线程履行数为
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
// 加入到executors中,以为volume作为key
executors.put(volume, executor);
}
还有1个需要注意的是提交履行方法submitLazyPersistTask.
void submitLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File metaFile, File blockFile,
FsVolumeReference target) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+ bpId + " block id: " + blockId);
}
// 获得需要持久化到目标磁盘实例
FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
File lazyPersistDir = volume.getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+ lazyPersistDir.toString());
}
// 新建此服务Task
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
bpId, blockId, genStamp, creationTime, blockFile, metaFile,
target, lazyPersistDir);
// 提交到对应volume的线程池中履行
execute(volume.getCurrentDir(), lazyPersistTask);
}
如果在上述履行的进程中产生失败,会调用失败处理的方法,并会重新将此副本块插入到replicateNotPersisted队列等待下1次的持久化.
public void onFailLazyPersist(String bpId, long blockId) {
RamDiskReplica block = null;
block = ramDiskReplicaTracker.getReplica(bpId, blockId);
if (block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
// 重新插入队列操作
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
其他的removeVolume等方法实现比较简单,这里不做过量的介绍.下面是RamDiskAsyncLazyPersistService总的结构图:
综合以上3部份的内容论述,主要描写了LAZT_PERSIST下的FIFO先进先出的队列式内存数据块持久化的顺序,异步持久化服务的内部运行逻辑和LRU算法移除数据副本块来预留内存空间.
介绍完以上原理部份的内容以后,最后补充具体的配置使用了.
首先要使用LAZY_PERSIST内存存储策略,需要有对应的存储介质,内存存储介质对应的类型是RAM_DISK.
所以第1步,需要将机器中已完成好的RAM disk虚拟内存盘配置到配置项dfs.datanode.data.dir中,其次还要带上,RAM_DISK的标签.以下:
<property>
<name>dfs.datanode.data.dir</name>
<value>/grid/0,/grid/1,/grid/2,[RAM_DISK]/mnt/dn-tmpfs</value>
</property>
注意,这个标签是必须要打上的,否则HDFS都默许的是DISK.
第2步就是设置具体的文件的策略类型了,上文中已提到过了.
然后附带2个注意事项:
1.http://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/MemoryStorage.html
2.百度百科.tmpfs
3.百度百科.RAM disk
4.百度百科.LRU算法