程序员人生 网站导航

探讨erlang消息选择性接收和改进

栏目:php教程时间:2015-03-16 11:02:20
从 rabbitMQ 代码中找到 gen_server2 , 对gen_server进行了1些优化。看到先辈写的博文也提到这个,引发了我的思考。见 gen_server2 - OTP gen_server优化版 。

gen_server2 引发的思考

正如 litaocheng 所说的:
gen_server 和 gen_server2 最大的不同是:
gen_server2 收到任何1条消息放到外部的队列中,当VM内部消息队列为空后,才进行消息处理,继续循环
gen_server 收到任何1条消息后,立即进行处理,处理完成后继续循环

其次,还有1个很重要的不同点:
gen_server2 使用的外部队列是带优先级排序的,功能模块本身可以定制消息优先级,乃至直接抛弃消息。(导出 prioritise_call/prioritise_cast/prioritise_info 几个函数实现定制,返回的数值越大优先级越高,返回drop就抛弃消息)
最高优先级是 infinity, 在处理 {system, _From, _Req} 和 {'EXIT', Parent, _R} 使用了这个优先级。

但在博文也援用了Joe' Bog 在merle 所做的测试,看了merle 的代码,没有用到 prioritise_XXX 函数,说明没有明显利用到 gen_server2 优先级控制的好处,那为何能取得不错的效果?(见下图)

讨论 gen_server2 的测试

通过浏览 Joe 在Github写的 merle 代码,很快发现问题:
可以看出,merle 在 handle_call 时都会调用 send_generic_cmd 、send_get_cmd 等类似函数。这些函数实现上都会阻塞进程直到接收到某些特定消息。
下面以 send_generic_cmd 为例做说明:
send_generic_cmd(Socket, Cmd) ->
    gen_tcp:send(Socket, <<Cmd/binary, " ">>),
    Reply = recv_simple_reply(),
    Reply.


recv_simple_reply() ->
   receive
      {tcp,_,Data} ->
           string:tokens(binary_to_list(Data), " ");
      {error, closed} ->
           connection_closed
      after ?TIMEOUT -> timeout
   end.
另外,gen_tcp:send 在实现上也 receive 等待某个特定信息,见 prim_inet:send(Socket, Packet, Opts)
send(S, Data, OptList) when is_port(S), is_list(OptList) ->
    try erlang:port_command(S, Data, OptList) of
    false -> % Port busy and nosuspend option passed
       {error,busy};
    true ->
       receive
          {inet_reply,S,Status} ->
             Status
      end
   catch
      error:_Error ->
         {error,einval}
   end.
也许可能有读者不明白,这里说的等待某个特定消息是指选择性接收。具体例子以下:
选择性接收:
   receive
         {ok, Result} ->
               Result
   end.

非选择性接收:
     receive
         Info ->
                Info
     end.
选择性接收只针对某类信件,会1直阻塞住直到找到该信件为止,或超时。非选择性接收的结果是每条消息都会被消费掉,方式为先进先出,不存在扫描信箱的问题。

前面提到,merle 在 handle_call 时都会 receive 住,等待某个特定消息。这个的代价就是每次receive住,erlang VM都要扫描进程全部信箱队列。特别像 Joe 在做此类测试时,消息处理速度远远低于消息投递速度,换句话说,gen_server进程信箱前面所有大部份的信件都是作者自己发的 gen_server:call 要求消息,然后每次 receive 住都要匹配这些消息。
比如, Joe 测试的是 merle:getkey 操作,那末信箱大部份消息就是 gen_server:call 投递的 getkey 消息,而 handle_call 在处理时就要扫描完前面的getkey消息,才能得到想要的 {tcp,_,Data} 消息。进程信箱消息队列以下所示:
getkey
getkey
getkey
...
getkey
{tcp,_,Data}
...
换成 gen_server2 的方式,gen_server2 会清空消息队列,那末进程信箱消息队列以下所示:
getkey
getkey
{tcp,_,Data}
...
前面还有2个getkey表示 gen_server2清空后在 handle_call 处理进程中 gen_server:call 又投递了新的 getkey 消息,数据量对照 gen_server来讲可以说是极少了,所以,消息匹配的次数就少了很多,这就会出现 Joe 测试的结果。

讨论erlang消息选择性接收

在讨论这个问题之前,先援用 learnyousomeerlang 对消息选择性接收的介绍(原文),很生动具体。
When there is no way to match a given message, it is put in a save queue and the next message is tried. If the second message matches, the first message is put back on top of the mailbox to be retried later.

就是说,erlang消息匹配不上,就会把消息放到 Save Queue 的队列中,当匹配到了后再把消息放回进程信箱。以上是形象化的说法,如果是这样就必定存在消息入列出列开消,那VM究竟是不是这样实现呢?

所以,对选择性接收,这里取3个问题出来说:
1、上面提到的,消息 Save Queue 是不是存在入列出列开消
2、当选择性接收时,新消息到来时会不会重复扫描信箱前面匹配不上的消息
3、假定第2点不存在重复扫描,那末如果消息已匹配到了,再匹配多1次这个消息,会不会重复扫描前面的消息

带着上面的疑问,下面以1个简单的例子做说明
-module(test).
-compile(export_all).
t() ->
   receive
      ok ->
          ok
   end.
保存为test.erl,然后编译,生成opcode
1> c(test).
{ok,test}
2> erts_debug:df(test).
ok
在目录下找到生成的 test.dis,t() 函数opcode以下:
04BE84B0: i_func_info_IaaI 0 test t 0 
04BE84C4: i_loop_rec_fr f(04BE84EC) x(0) 
04BE84CC: i_is_eq_exact_immed_frc f(04BE84E4) x(0) ok 
04BE84D8: remove_message 
04BE84DC: move_return_cr ok x(0) 
04BE84E4: loop_rec_end_f test:t/0 
04BE84EC: wait_locked_f test:t/0 
逐行解释这段代码:
i_loop_rec_fr
receive接收信息,如果有信息放到 x(0) 寄存器,继续下1条指令;没有消息就跳到地址 04BE84EC,即 wait_locked_f 
i_is_eq_exact_immed_frc 
匹配 x(0)寄存器的值和ok是不是相等,如果相等继续下1条指令;否则跳到04BE84E4,即 loop_rec_end_f 
remove_message 
移除进程消息队列中“当前”的信息(也就是上1行匹配到的信息)
move_return_cr 
将 ok 送到 x(0)寄存器并返回结果
loop_rec_end_f 
将“当前消息”指针指向下1个位置,如果指向位置有消息,则跳到test:t/0第1段代码地址继续履行,即 04BE84C4;否则继续履行下1条指令 04BE84EC,即 wait_locked_f 
wait_locked_f  
阻塞当前进程,等待下1次调度,再检查是不是有新的消息到达
以上, i_is_eq_exact_immed_frc 和 move_return_cr 在 beam_hot.h实现,其他在 beam_emu.c 实现,都可以找相干代码。

/* * beam_emu.c process_main() 线程入口函数,实现VM调度 * 以下截取 i_loop_rec_fr 处理进程 * 作用是从信箱取出1条消息放到 x(0) 寄存器;没消息则跳到 wait或 wait_timeout指令 */ OpCase(i_loop_rec_fr): { BeamInstr *next; ErlMessage* msgp; loop_rec__: PROCESS_MAIN_CHK_LOCKS(c_p); // 取出“当前位置”的消息 msgp = PEEK_MESSAGE(c_p); if (!msgp) { //如果消息不存在,尝试从SMP下public queue获得消息 #ifdef ERTS_SMP erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); if (ERTS_PROC_PENDING_EXIT(c_p)) { // 如果进程准备退出,则不处理消息了 erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); SWAPOUT; goto do_schedule; // 等待下1次调度 } // SMP下把消息移到进程私有堆尾部(纯指针操作) ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); // 再尝试取出“当前位置”的消息 msgp = PEEK_MESSAGE(c_p); if (msgp) erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); else #endif { // 信箱没消息则跳到 wait或 wait_timeout指令(实际上就是履行下1条履行) SET_I((BeamInstr *) Arg(0)); Goto(*I); } } // 解析散布式消息,把消息附加的数据复制到进程私有堆 ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS, { SWAPOUT; reg[0] = r(0); PROCESS_MAIN_CHK_LOCKS(c_p); }, { ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); r(0) = reg[0]; SWAPIN; }); if (is_non_value(ERL_MESSAGE_TERM(msgp))) { /* * 如果消息破坏就移除(出现这类情况是散布式消息解码出现毛病) */ ASSERT(!msgp->data.attached); UNLINK_MESSAGE(c_p, msgp); // 移除消息,侧重将“当前”位置指向下1条消息 free_message(msgp); // 烧毁消息 goto loop_rec__; // 跳到上面继续 } PreFetch(1, next); // 标记下1条指令位置 r(0) = ERL_MESSAGE_TERM(msgp); NextPF(1, next); // 履行下1条指令 }
来看下这两个宏定义:
/* Get "current" message */ #define PEEK_MESSAGE(p) (*(p)->msg.save)
从字面上就知道这个宏是取"当前的"消息,取了 msg.save 的值
#define UNLINK_MESSAGE(p,msgp) do { ErlMessage* __mp = (msgp)->next; *(p)->msg.save = __mp; (p)->msg.len--; if (__mp == NULL) (p)->msg.last = (p)->msg.save; (p)->msg.mark = 0; } while(0)
这个宏就是移除消息操作,消息队列长度⑴,把 msg.save 指向了 msgp的下1条消息;如果 msgp->next 为 NULL,表示这是最后1条消息,就把 msg.last 等于了 msg.save
/* * beam_emu.c process_main() 线程入口函数,实现VM调度 * 以下截取 remove_message 处理进程(已删除没必要要的代码) * 作用是将消息从信箱队列中移除 */ OpCase(remove_message): { BeamInstr *next; ErlMessage* msgp; PROCESS_MAIN_CHK_LOCKS(c_p); PreFetch(0, next); msgp = PEEK_MESSAGE(c_p); // 取出当前的消息 if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) { save_calls(c_p, &exp_receive); } if (ERL_MESSAGE_TOKEN(msgp) == NIL) { SEQ_TRACE_TOKEN(c_p) = NIL; } else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) { // 追踪调试内容,可以疏忽 Eterm msg; SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp); c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p)); if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) { c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p)); } msg = ERL_MESSAGE_TERM(msgp); seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE, c_p->common.id, c_p); } UNLINK_MESSAGE(c_p, msgp); // 移除消息,侧重队列长度⑴ JOIN_MESSAGE(c_p); // 重置“当前”位置,指向了队列第1条消息 CANCEL_TIMER(c_p); free_message(msgp); // 烧毁消息 ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); NextPF(0, next); // 履行下1条指令 }
所以,当消息匹配时,就会重新指向了信箱第1条消息,这样,第3个问题就有了答案,会重新扫描信箱。
再来看看这个宏:
/* Reset message save point (after receive match) */ #define JOIN_MESSAGE(p) (p)->msg.save = &(p)->msg.first
这个宏就是讲 msg.save 指向了 msg.first ,就是第1个消息
下面看下消息不匹配的情况就是 loop_rec_end_f 
/* * beam_emu.c process_main() 线程入口函数,实现VM调度 * 以下截取 loop_rec_end_f 处理进程 * 作用是继续取出最新的消息匹配 */ /* * Advance the save pointer to the next message (the current * message didn't match), then jump to the loop_rec instruction. */ OpCase(loop_rec_end_f): { SET_I((BeamInstr *) Arg(0)); SAVE_MESSAGE(c_p); // “当前”位置指向下1个位置 goto loop_rec__; // 继续取出消息匹配 }
这个opcode实现了不断取消息出来匹配的进程,直到失去调度机会,等待下1次调度。
也看下这个宏:
/* Save current message */ #define SAVE_MESSAGE(p) (p)->msg.save = &(*(p)->msg.save)->next
这个宏就是将 msg.save 指向了下1个位置。
到这里第1个问题和第2个问题都有答案了,前面说到的 Save Queue 只是“形象化”的队列,实际不存在,所以不存在消息入列出列的开消问题。然后第2个问题,消息选择性接收,当消息匹配不上,有新消息到来时不会重复扫描信箱前面匹配不上的消息。

总结

针对erlang选择性接收的问题,gen_server2给我们1个方向,通过外部队列减少了消息的匹配,而且控制优先级来控制消息的处理。
这里也说说 gen_server2 的副作用:
gen_server2会带来1种问题,erlang原来会利用进程信箱长度来抑制发送者进程(通过减少消息发送者进程的调度机会 Reduction,可以参考这篇文章《erlang send剖析及参数意义》)。但是,gen_server2 每次都会清空进程信箱的消息队列,没法利用到 VM 提供的抑制消息队列过快暴涨的保护机制。
针对这个问题,gen_server2 通过 prioritise_XXX 函数向外部模块暴露消息队列长度,使调用者可以根据消息队列长度控制是不是抛弃消息,以实现对消息的抑制。

实际上,gen_server 在我们的开发中就够用了,很少需要去斟酌erlang选择性接收的问题。rabbitMQ是针对消息队列的处理,必定有不计其数的消息量,那才正好需要 gen_server2 的作用。如果也到了这类消息量,那就建议使用 gen_server2

参考:http://blog.csdn.net/mycwq/article/details/44049749

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐