程序员人生 网站导航

Android7.0 MessageQueue

栏目:综合技术时间:2016-12-01 15:00:57



public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } //sThreadLocal为线程本地存储区;每一个线程唯一1个Looper sThreadLocal.set(new Looper(quitAllowed)); } private Looper(boolean quitAllowed) { //创建出MessageQueue mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }

1 NativeMessageQueue

MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; //mPtr的类型为long? mPtr = nativeInit(); }


static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { //MessageQueue的Native层实体 NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); ............ //这里应当类似与将指针转化成long类型,放在Java层保存;估计Java层使用时,会在native层将long变成指针,就能够操作队列了 return reinterpret_cast<jlong>(nativeMessageQueue); }


NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { //创建1个Native层的Looper,也是线程唯1的 mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }


2 Native层的looper


Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { //此处创建了个fd mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); ....... rebuildEpollLocked(); }


void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); ............ struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; //在mEpollFd上监听mWakeEventFd上是不是有数据到来 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); ........... for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); //监听request对应fd上数据的到来 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); ............ } }


1 写入消息

1.1 Java层写入消息

boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { ..... return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; //在头部插入数据,如果之前MessageQueue是阻塞的,那末现在需要唤醒 needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } //不是第1个异步消息时,needWake置为false if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }



void NativeMessageQueue::wake() { mLooper->wake(); } void Looper::wake() { uint64_t inc = 1; //就是向mWakeEventFd写入数据 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); ............. }


1.2 Native层写入消息

void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); sendMessageAtTime(now, handler, message); } void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { size_t i = 0; { AutoMutex _l(mLock); //一样需要按时间插入 size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } //将message包装成1个MessageEnvelope对象 MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: If the Looper is currently sending a message, then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decide when the next wakeup time should be. In fact, it does // not even matter whether this code is running on the Looper thread. if (mSendingMessage) { return; } } // Wake the poll loop only when we enqueue a new message at the head. if (i == 0) { //若插入在队列头部,一样利用wake函数触发epoll唤醒 wake(); } }



public static void loop() { final Looper me = myLooper(); ....... for (;;) { Message msg = queue.next(); // might block ....... try { //调用Message的处理函数进行处理 msg.target.dispatchMessage(msg); }........ } }


Message next() { //mPtr保存了NativeMessageQueue的指针 final long ptr = mPtr; ....... int pendingIdleHandlerCount = -1; // ⑴ only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { //会调用Native函数,终究调用IPCThread的talkWithDriver,将数据写入Binder驱动或读取1次数据 //不知道在此处进行这个操作的理由? Binder.flushPendingCommands(); } //处理native层的数据,此处会利用epoll进行blocked nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; //下面其实就是找出下1个异步处理类型的消息;异步处理类型的消息,才含有对应的履行函数 if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; //完成next记录的存储 if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } //MessageQueue中引入了IdleHandler接口,即当MessageQueue没有数据处理时,调用IdleHandler进行1些工作 //pendingIdleHandlerCount表示待处理的IdleHandler,初始为⑴ if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { //mIdleHandlers的size默许为0,调用接口addIdleHandler才能增加 pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } //将待处理的IdleHandler加入到PendingIdleHandlers中 if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } //调用ArrayList.toArray(T[])节省每次分配的开消;毕竟对Message.Next这样调用频率较高的函数,能省1点就是1点 mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { //履行实现类的queueIdle函数,返回值决定是不是继续保存 keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } pendingIdleHandlerCount = 0; nextPollTimeoutMillis = 0; } }


2.1 nativePollOnce

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { //果然Java层调用native层MessageQueue时,将long类型的ptr变成指针 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; //最后还是进入到Native层looper的pollOnce函数 mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { ......... } }


//timeoutMillis为超时等待时间。值为⑴时,表示无穷等待直到有事件到来;值为0时,表示无需等待 //outFd此时为null,含义是:存储产生事件的文件句柄 //outEvents此时为null,含义是:存储outFd上产生了哪些事件,包括可读、可写、毛病和中断 //outData此时为null,含义是:存储上下文数据,其实调用时传入的参数 int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { //处理response,目前我们先不关注response的内含 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } //根据pollInner的结果,进行操作 if (result != 0) { if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } //主力还是靠pollInner result = pollInner(timeoutMillis); } }


int Looper::pollInner(int timeoutMillis) { // Adjust the timeout based on when the next message is due. //timeoutMillis是Java层事件等待事件 //native层保持了native message的等待时间 //此处其实就是选择最小的等待时间 if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } } int result = POLL_WAKE; //pollInner初始就清空response mResponses.clear(); mResponseIndex = 0; // We are about to idle. mPolling = true; //利用epoll等待mEpollFd监控的句柄上事件到达 struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); //重新调用rebuildEpollLocked时,将使得epoll句柄能够监听新加入request对应的fd if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ...... result = POLL_ERROR; goto Done; } // Check for poll timeout. if (eventCount == 0) { result = POLL_TIMEOUT; goto Done; } for (int i = 0; i < eventCount; i++) { if (fd == mWakeEventFd) { if (epollEvents & EPOLLIN) { //前面已分析过,当java层或native层有数据写入队列时,将写mWakeEventFd,以触发epoll唤醒 //awoken将读取并清空mWakeEventFd上的数据 awoken(); } else { ......... } } else { //epoll一样监听的request对应的fd ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; //存储这个fd对应的response pushResponse(events, mRequests.valueAt(requestIndex)); } else { .......... } } } Done: // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; //处理Native层的Message while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); //处理Native Message handler->handleMessage(message); } mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); //处理带回调函数的response for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; //调用response的callback int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } response.request.callback.clear(); result = POLL_CALLBACK; } } return result; }

如上图所示,在nativePollOnce中利用epoll监听是不是有数据到来,然后处理native message、native response。


3 添加监控要求

//fd表示需要监听的句柄 //ident的含义还没有弄明白 //events表示需要监听的事件,例如EVENT_INPUT、EVENT_OUTPUT、EVENT_ERROR和EVENT_HANGUP中的1个或多个 //callback为事件产生后的回调函数 //data为回调函数对应的参数 int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); }



int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { ........ { AutoMutex _l(mLock); //利用参数构造1个request Request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number ⑴ struct epoll_event eventItem; request.initEventItem(&eventItem); //判断之前是不是已利用该fd构造过Request ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { //mEpollFd新增1个需监听fd int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); ....... mRequests.add(fd, request); } else { //mEpollFd修改旧的fd对应的监听事件 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); if (epollResult < 0) { if (errno == ENOENT) { // Tolerate ENOENT because it means that an older file descriptor was // closed before its callback was unregistered and meanwhile a new // file descriptor with the same number has been created and is now // being registered for the first time. epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); ....... } //产生毛病重新加入时,安排EpollRebuildLocked,将让epollFd重新添加1次待监听的fd scheduleEpollRebuildLocked(); } mRequests.replaceValueAt(requestIndex, request); } } }





2、Native层的MessageQueue初始化后,将创建对应的Native Looper对象。Native对象初始化时,将创建对应epollFd和WakeEventFd。其中,epollFd将作为epoll的监听句柄,初始时epollFd仅监听WakeEventFd。

3.1、当Java层的Looper开始循环时,首先需要通过JNI函数调用Native Looper进行pollOnce的操作。

3.2、Native Looper开始运行后,需要等待epollFd被唤醒。当epollFd等待超时或监听的句柄有事件到来,Native Looper就能够开始处理事件了。

3.3、在Native层,Native Looper将先处理Native MessageQueue中的消息,再调用Response对应的回调函数。



利用looper:sendMessage,可以为Native MessageQueue增加消息;一样,要时将向Native层的WakeEventFd写入消息,以唤醒epollFd;
利用looper:addFd,可以向Native Looper注册监听要求,监听要求包括需监听的fd、监听的事件及对应的回调函数等,监听要求对应的fd将被成为epollFd监听的对象。当被监听的fd产生对应的事件后,将会唤醒epollFd,此时将生成对应response加入的response List中,等待处理。1旦response被处理,就会调用对应的回调函数。


