emqttd随笔

结构

emq-relx文件组织结构

+_rel/                         # 编译发布之后的目标文件. 
-bin/                         
cuttlefish
emqenv
emqttd
emqttd.cmd
emqttd_ctl
emqttd_ctl.cmd
install_upgrade_escript
nodetool
-data/                         
loaded_plugins               # 需要加载的插件放在这里.
+deps/                         # 所有依赖的app.
-rel/                          # 与发布相关的配置文件, 这些文件编译的时候大部分会被作为模板拷贝到目标文件中.
-conf/
-plugins/                  ## 插件的配置文件.
emq_dashboard.conf
acl.conf
emq.conf
-schema/
emq.schema
emq_dashboard.schema
erlang_vm.schema
.gitignore
LICENSE
Makefile                       # 和erlang.mk一起使用来编译和发布.
README.md
erlang.mk
relx                           # erlang release的程序.
relx.config                    # relx的配置文件.
vars.config                    # relx用于覆盖替换模板的文件.

两条线

编译和发布

从执行make开始进行了什么操作?

Makefile中首先定义了一些变量,这些变量包括PROJECT,PROJECT_DESCRIPTION,PROJECT_VERSION, DEPS和以"dep_"开头的依赖项下载方式, 这些变量的作用是指定依赖项。在随后的include erlang.mk中会使用这些变量,并一一下载并。

此外还定义了一个plugins目标,并将其作为app的先决条件。那什么时候执行app这个目标呢?我们看到Makefile中有一个include erlang.mk, 这个很重要,我们在执行make的时候就是因为有include才会执行erlang.mk的,而app是erlang.mk里面已经有定义的,所以会先执行plugins, 然后 在执行app。

我们运行make的时候,没有执行具体的目标的时候,会默认执行make all。我们在erlang.mk中看到all: deps app rel,也就是all包括三部分, 其中deps负责下载Makefile中指定的依赖app,app则是在编译的时候生成相应的bin文件,rel负责生成发布。

  • relx发布工具

    主要时relx.config的配置问题。配置都是使用tuple格式, 每一项结尾用点号结尾。

    {release, {appname, vsn}, [apps]}.

    apps通过tuple格式指定的依赖app版本和启动方式。主要说明一下启动方式:

    参考:http://erlang.org/doc/man/rel.html, 有permanent|transient|temporary|load|none,具体解释:

    Start type of an application included in the release.
    
    If Type = permanent | transient | temporary, the application is loaded and started in the corresponding way, see application(3).
    
    If Type = load, the application is only loaded.
    
    If Type = none, the application is not loaded and not started, although the code for its modules is loaded.
    
    Defaults to permanent
    

emqttd的执行

从执行bin/emqttd console开始进行了什么操作.

  • 配置信息

    从2.0.3开始,emqttd的配置方式进行了更改,使用cuttlefish库将*.conf(包括插件中的conf文件)中的每行Key=Value的配 置在启动emqttd的时候,临时转化为传统的app.config的格式, 启动的时候读取的配置信息其实还是传统的app.config的格 式。如果插件的配置文件也是*.conf, 那么就会在emqttd启动的时候一起把配置文件加载到系统中。

    同时也兼容传统的配置方式,这种方式的配置文件是以*.config的命令在配置文件目录中的,加载插件的时候通过emqttd_plugins:init/0, 将文件加载进到系统中,随后再emqttd_plugins:load/0将插件app启动。 注意,*.config类型的配置文件不会被cuttlefish 处理到app.config文件中,因此这类参数不会自动加载,需要再emqttd_plugins:init/0中加载。

    # 这里首先定义cuttlefish生成配置文件的命令前缀。
    CUTTLEFISH="on"
    if [ -z "$CUTTLEFISH" ]; then
        CUTTLEFISH_COMMAND_PREFIX=""
    else
        CUTTLEFISH_COMMAND_PREFIX="$ERTS_PATH/escript $RUNNER_ROOT_DIR/bin/cuttlefish -s $REL_DIR/schema -d $RUNNER_DATA_DIR/configs"
    fi
    

    接着定义生成配置文件的函数:

    generate_config() {
        if [ -z "$CUTTLEFISH" ]; then
            # Note: we have added a parameter '-vm_args' to this. It
            # appears redundant but it is not! the erlang vm allows us to
            # access all arguments to the erl command EXCEPT '-args_file',
            # so in order to get access to this file location from within
            # the vm, we need to pass it in twice.
            CONFIG_ARGS=" -config $RUNNER_ETC_DIR/app.config -args_file $RUNNER_ETC_DIR/vm.args -vm_args $RUNNER_ETC_DIR/vm.args "
        else
            # 将emqttd的配置文件和插件的配置文件merge在一起. 
            APPCONF=`relx_nodetool mergeconf $RUNNER_ETC_DIR/emq.conf $RUNNER_ETC_DIR/plugins $RUNNER_DATA_DIR/configs`
            if [ "$?" -ne 0 ]; then
                echoerr "Error merging configs!"
                exit 1
            fi
            replace_env_in_conf
            # 生成配置文件。 
            CONFIG_ARGS=`$CUTTLEFISH_COMMAND_PREFIX -c $APPCONF generate`
            if [ "$?" -ne 0 ]; then
                echoerr "Error generating config with cuttlefish"
                echoerr "  run \`$RUNNER_SCRIPT config generate -l debug\` for more information."
                exit 1
            fi
        fi
    
        if ! relx_nodetool chkconfig $CONFIG_ARGS; then
            echoerr "Error reading $CONFIG_ARGS"
            exit 1
        fi
    }
    

    最后在执行的时候调用上面的函数在data/configs/中生成app.config和vm.args。

  • 插件的加载

    前面说了插件使用两种方式进行配置的参数加载方式。值得注意的是,在 emqttd_plugins:init/0中加载配置参数的时候,只要在 etc/plugins中所有*.config的配置参数都会被加载到系统中,但是emqttd_plugins:load/0启动插件app的时候则是仅仅将loaded_plugins文件 中已有的app启动。 因此,系统中其实存在一堆没有用的参数。

关于虚拟机的配置参数

Flags

  • heart

    启动erlang runtime system的心跳监控。 这篇文章 解释了heart的原理。启动虚拟的的时候,如果指定-heart 参数,那么会通过模块 heart.erl 启动一个独立的外部进程,名为heart,该进程会监控虚拟机,虚拟机每个

表结构

先列出emq所有的ets和mnesia表:

Table Name ets/mnesia Owner Attribute Value Specification
mqttd_ctl_cmd ets ctl [ordered_set, named_table, protected] {{Seq, Cmd}, {Mod, Fun}, Opts} 按序号记录emq的控制命令。
mqtt_hook ets hook [set, protected, named_table, {keypos, #hook.name}] #hook{name, [#callback{tag, function, init_args, priority}]} 记录你使用到的hook,主要是插件中使用的。(callbacks按照priority排序)
mqtt_local_route ets router [set, named_table, protected] {Topic, node()} 本地订阅的路由表(全局路由表是保存在mnesia表mqtt_route中)
mqtt_topic mnesia mnesia_monitor/router [{ram_copies, [node()]}, {record_name, mqtt_topic}, {attributes, record_info(fields, mqtt_topic)}] #mqtt_topic{topic, flags=[]::[retained or static]} 记录被订阅的所有topic信息,内容是#mqtt_topic.
mqtt_route mnesia router [{type, bag}, {ram_copies, [node()]}, {record_name, mqtt_route}, {attributes, record_info(fields, mqtt_route)}] #mqtt_route{topic,node} 全局的路由表, 内容是#mqtt_route。
mqtt_trie mnesia mnesia_monitor/trie [{ram_copies, [node()]},{record_name, trie},{attributes, record_info(fields, trie)}] #trie{#trie_edge{node_id, word}, node_id} 仅针对含统配的topic。
mqtt_trie_node mnesia mnesia_monitor/trie_node [{ram_copies, [node()]},{record_name, trie_node},{attributes, record_info(fields, trie_node)}] #trie_node{node_id, edge_count, topic, flags} 仅针对含统配的topic。
mqtt_subproperty ets pubsub_sup [public, named_table, set, {read_concurrency, true}, {write_concurrency, true}] {{Topic, Subscriber}, [local, {share, Share} or {share, '$queue'}]} pubsub_sup创建,记录某个topic被某个client的订阅属性。
mqtt_subscriber ets pubsub_sup [public, named_table, duplicate_bag, {read_concurrency, true}, {write_concurrency, true}] {Topic or {local, Topic}, Subscriber or {Share, Subscriber}} 本地节点的订阅关系,key是topic,value是clientId。表示该topic被谁订阅。
mqtt_subscription ets pubsub_sup [public, named_table, bag, {read_concurrency, true}, {write_concurrency, true}]   本地节点的订阅关系,key是clientId,value是topic。表示哪个client订阅了哪个topic。
mqtt_stats ets stats [set, public, named_table, {write_concurrency, true}]   统计dashboard的stats内容。
mqtt_client_stats ets stats [set, public, named_table, {write_concurrency, true}]   client_enable_stats参数打开才会统计, 统计client_pid的资源占用情况, 下表下的解释。
mqtt_session_stats ets stats [set, public, named_table, {write_concurrency, true}]   session_enable_stats参数打开才会统计, 统计session_pid的资源占用情况, 下表下的解释。
mqtt_metric ets metrics [set, public, named_table, {write_concurrency, true}]   记录dashboard的metrics部分(包括packet,message,bytes个数)。
mqtt_client ets cm_sup [ordered_set, named_table, public, {keypos, 2}, {write_concurrency, true}]   cm_sup创建,用于本地的client的注册。内容是#mqtt_client.
mqtt_local_session ets sm_sup [public, ordered_set, named_table, {write_concurrency, true}] {ClientId, ClientPid, CleanSess, Properties} sm_sup创建并维护,记录本地节点上的session(session在本地注册)。
mqtt_session mnesia mnesia_monitor/sm [{type, set}, {ram_copies, [node()]}, {record_name, mqtt_session}, {attributes, record_info(fields, mqtt_session)}] #mqtt_session{client_id, sess_pid, clean_sess} 记录全局的路由信息。本地路由信息保存在ets表mqtt_local_session.
mqtt_access_control ets access_control [set, named_table, protected, {read_concurrency, true}] {auth_modules/acl_modules, [{Mod, ModState, Seq}...]} 包含两个条目auth_modules,acl_modules,将auth/acl规则注册在这里,auth/acl的时候从这个获取要执行的模块和相应的参数。
mqtt_acl_rule ets access_control [set, public, named_table, {read_concurrency, true}]   记录internal的acl规则, acl_modules中的emqttd_acl_internal的规则会保存在这里,acl会查询这里。
mqtt_broker ets broker [set, public, named_table]   记录broker的信息,目前暂时没有使用。
mqtt_retained mnesia mnesia_monitor/retained 可设置 #mqtt_retained{topic, msg, ts} 记录retain消息。
mqtt_admin mnesia mnesia_monitor/dashboard [{type, set}, {local_content, true}, {disc_copies, [node()]}, {record_name, mqtt_admin}, {attributes, record_info(fields, mqtt_admin)}] #mqtt_admin{username, password, tags} 记录dashoard的管理员。
  • mqtt_client_stat: {<clientId>, [{'$ts',1497948011}, {mailbox_len,0}, {heap_size,610}, {reductions,802}, {recv_pkt,1}, {recv_msg,0}, {send_pkt,0}, {send_msg,0}, {recv_oct,84}, {recv_cnt,1}, {send_oct,4}, {send_cnt,1}, {send_pend,0}]}
  • mqtt_session_stat: {<ClientId>, [{'$ts',1497948010}, {mailbox_len,0}, {heap_size,610}, {reductions,327}, {max_subscriptions,0}, {subscriptions,0}, {max_inflight,32}, {inflight_len,0}, {max_mqueue,0}, {mqueue_len,0}, {mqueue_dropped,0}, {max_awaiting_rel,100}, {awaiting_rel_len,0}, {deliver_msg,0}, {enqueue_msg,0}]}

注意,ets只能记录本地共享的信息。项目中主要是注册信息,统计信息(目前只记录本地的统计信息,不会记录全局的统计信息)。

关于代码

关于esockd_listener_sup

注意这个supervisor在启动的init中没有启动任何子进程,而且指定子进程的spec是:

init([]) ->
    {ok, {{rest_for_one, 10, 3600}, []}}.

connection_sup, acceptor_sup, listener三个子进程一次启动,而先启动的子进程就会作为参数给下一个启动的子进程。 因此,如果一个子进程挂了,那么后面启动的子进程也必须重新启动,否则前面启动的作为参数的就无效了。

关于esockd_connection_sup

esockd_connection_sup不是一个严格的supervisor。它只是一个gen_server。这是因为他的特殊性决定的。

start_link(Options, MFArgs, Logger) ->
    gen_server:start_link(?MODULE, [Options, MFArgs, Logger], []).

supervisor中的必须有重启策略,如果没有在spec中填写默认就是one_for_one。这几种重启策略都会重启子进程。 但是对于sockt连接,断了就是断了,不应该重启的。因此不需要什么重启策略。那么supervisor怎么也得有个监督关系啊, 需要的是当子进程挂了的时候,supervisor要收到消息。

那他是怎么启动子进程(socket连接)的呢?

在esockd_connection_sup.erl中,Conn:start_link(MFArgs)函数调用 emqttd_client:start_link/2 来创建client进程。

esockd_connection_sup.erl:

start_connection(Sup, Mod, Sock, SockFun) ->
    case call(Sup, {start_connection, Sock, SockFun}) of
        {ok, Pid, Conn} ->
                                                % transfer controlling from acceptor to connection
            Mod:controlling_process(Sock, Pid),
            Conn:go(Pid),
            {ok, Pid};
        {error, Error} ->
            {error, Error}
    end.

handle_call({start_connection, Sock, SockFun}, _From, 
            State = #state{conn_opts = ConnOpts, mfargs = MFArgs,
                           curr_clients = Count, access_rules = Rules}) ->
    case inet:peername(Sock) of
        {ok, {Addr, _Port}} ->
            case allowed(Addr, Rules) of
                true ->
                    Conn = esockd_connection:new(Sock, SockFun, ConnOpts),
                    case catch Conn:start_link(MFArgs) of
                        {ok, Pid} when is_pid(Pid) ->
                            ...

emqttd_client:start_link/2 调用 proc_lib:spawn_link/3 来启动进程:

emqttd_client.erl:

start_link(Conn, Env) ->
    {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.

为什么这里要使用proc_link:spwan_link/3来启动连接进程呢?因为这个函数最终是调用erlang:spawn_link来启动,并自动创建link。 该函数和erlang:start_link的方式区别是spawn_link属于异步启动进程。一调用就会返回子进程ID。 他的用处在emqttd_client:init中看到:

init([Conn0, Env]) ->
    {ok, Conn} = Conn0:wait(),
    case Conn:peername() of
        {ok, Peername}    -> do_init(Conn, Env, Peername);
        {error, enotconn} -> Conn:fast_close(),
                             exit(normal);
        {error, Reason}   -> Conn:fast_close(),
                             exit({shutdown, Reason})
    end.

这里的 Conn0:wait():

esockd_connection.erl:

wait(Conn = ?CONN_MOD) ->
    receive {go, Conn} -> upgrade(Conn) end.

使用 receive 来接受消息{go, Conn}。如果emqttd_client:start_link中不使用spawn_link来启动进程,那么在 init 中就会卡死。 这样在 esockd_connection_sup:start_connection(Sup, Mod, Sock, SockFun) 中,Conn:go()就不会被执行。因此出现wait()一直 等待go()发出消息。

如果使用spawn_link就会直接返回,init中执行wait,go被执行后发出消息由wait收到,然后才执行do_init(Conn, Env, Peername) 函数。

另外,esockd_connection_sup 中和子进程link之后,相互都会收到对方 exit 的消息,这样可能 esockd_connection_sup 可能会因为 子进程挂掉而挂掉,为了避免这种情况,esockd_connection_sup 启动的时候在init中设置 process_flag(trap_exit, true), 这样 可以将子进程发送的 exit 消息转化为消息{'EXIT', Pid, Reason},从而避免 esockd_connection_sup 被牵连而挂掉。

handle_info({'EXIT', Pid, Reason}, State = #state{curr_clients = Count, logger = Logger}) ->
    ...

总结一下:只要理解了spawn_link的异步方式就可以理解wait和go了。

关于emqttd_client

emqttd_client进程启动

上一节中已经讲到 connection 进程是esockd_connection_sup调用 proc_lib:spawn_link 启动的。这个进程我们希望他是符合otp的通用服务器, 因此,在do_init中使用了:

gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout,
                       {backoff, 2000, 2000, 20000}).

我们看看 gen_server 中 enter_loop 的官方文档(gen_server2基本一样):

enter_loop(Module, Options, State)
enter_loop(Module, Options, State, ServerName)
enter_loop(Module, Options, State, Timeout)
enter_loop(Module, Options, State, ServerName, Timeout)

Makes  an existing process into a gen_server process. Does not
return, instead the  calling  process  enters  the  gen_server
process  receive  loop  and  becomes a gen_server process. The
process must have been started using one of  the  start  functions 
in proc_lib(3). The user is responsible for any initialization 
of the process, including registering a name for it.

This function is useful when  a  more  complex  initialization
procedure  is needed than the gen_server process behavior provides.
  • 也就是说让一个已经存在的进程变成 gen_server 通用服务器进程。该函数不会返回,而是变成通用服务器进入 循环接收消息的状态。
  • 但是有个要求是这个进程必须由proc_lib中的启动函数启动,并由用户负责进程的所有初始化,包括注册进程名。 这样后面我们就可以完全把他当做gen_server/gen_server2的进程来看待了。
  • 这个函数用在进程初始化比gen_server提供的初始化更复杂的情况下。

接收消息

首先,在esockd_listener.erl中创建监听socket的时候:

init({Protocol, ListenOn, Options, AcceptorSup, Logger}) ->
    Port = port(ListenOn),
    process_flag(trap_exit, true),
    %%Don't active the socket...
    SockOpts = merge_addr(ListenOn, proplists:get_value(sockopts, Options, [{reuseaddr, true}])),
    case esockd_transport:listen(Port, [{active, false} | proplists:delete(active, SockOpts)]) of
        {ok, LSock} ->
            ...

我们发现监听 socket 是 {active,false} 的被动socket,因此,在接收数据的时候每次都要通过执行:

Conn:async_recv(0, infinity),

来启动下一次数据的接收。(这和gen_tcp中的被动模式每次都要使用gen_tcp:recv()来接收数据一样), 这样做可以便于流控

下面我们看看接收消息的具体步骤, 在emqttd_client中,接收消息有几个state中的参数控制:conn_state, await_recv(初始时候=false)。

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
    Size = iolist_size(Data),
    ?LOG(debug, "RECV ~p", [Data], State),
    emqttd_metrics:inc('bytes/received', Size),
    received(Data, rate_limit(Size, State#client_state{await_recv = false}));

...

rate_limit(_Size, State = #client_state{rate_limit = undefined}) ->
    run_socket(State);
rate_limit(Size, State = #client_state{rate_limit = Rl}) ->
    case Rl:check(Size) of
        {0, Rl1} ->
            run_socket(State#client_state{conn_state = running, rate_limit = Rl1});
        {Pause, Rl1} ->
            ?LOG(warning, "Rate limiter pause for ~p", [Pause], State),
            erlang:send_after(Pause, self(), activate_sock),
            State#client_state{conn_state = blocked, rate_limit = Rl1}
    end.

run_socket(State = #client_state{conn_state = blocked}) ->
    State;
run_socket(State = #client_state{await_recv = true}) ->
    State;
run_socket(State = #client_state{connection = Conn}) ->
    Conn:async_recv(0, infinity),
    State#client_state{await_recv = true}.

当conn_state=blocked(流控的时候可能将其置为blocked)的时候不能接收消息,当 await_recv=true 的时候也不能接收消息, 当conn_state=/=blocked and await_recv=true的时候才可以通过 Conn:async_recv(0, infinity) 来接收消息。

通过 Conn:async_recv(0, infinity) 打开接收消息之后立刻讲await_recv=true,等待handle_info中接收完这一次消息之后,再 将await_recv=false,再次接收消息。

emqttd_trie.

如果topic中含有通配符,需要插入topic的字典树中。

-type(trie_node_id() :: binary() | atom()).

%% 表示所有的节点,以及该节点的所有属性.
-record(trie_node,
        { node_id         :: trie_node_id(),
          edge_count = 0  :: non_neg_integer(),       %% edge_count表示这个节点是其他多少个节点的path。
          topic           :: binary() | undefined,    %% 该节点对应的完成topic.
          flags           :: [retained | static]
        }).
%% 表示一个node
-record(trie_edge,
        { node_id :: trie_node_id(),
          word    :: binary() | atom()
        }).

-record(trie,
        { edge    :: #trie_edge{},
          node_id :: trie_node_id()
        }).

triples 依次获取路径,尾点,node_id(节点).

> emqttd_topic:triples(<<"v2/a/+/c">>).
[{root,<<"v2">>,<<"v2">>},
 {<<"v2">>,<<"a">>,<<"v2/a">>},
 {<<"v2/a">>,'+',<<"v2/a/+">>},
 {<<"v2/a/+">>,<<"c">>,<<"v2/a/+/c">>}]

相关模块

  • emqttd_app: 服务启动的总入口。
  • esockd_connection_sup: 监控client。创建 client 进程。
  • emqttd_client: mqtt tcp 的客户端连接进程。
  • emqttd_protocol: 处理mqtt协议相关的逻辑。也就是处理各种type的数据包的逻辑。
  • emqttd_parser: mqtt协议包解析。
  • emqttd_serializer: mqtt协议包序列化。
  • emqttd_session_sup: 监控session。创建session进程。
  • emqttd_sm: 负责session的管理。包括调用session_sup创建session;重建,复用,销毁session(mnesia的mqtt_session);注册,销毁session(ets的mqtt_local_session) 消息到本地的分发(查询mqtt_local_session)
  • emqttd_cm: 负责client的管理。client的session 进程创建成功后会将这个进程注册到ets中,并建立client和cm之间的monitor关系, 由 cm 监控 client 的生死存亡; 统计client的数量。

消息流

subscribe

  • session进程内部使用一个map(subscriptions)来保存所有订阅的topic,key为topic,value为qos。 当一个sub发生时,带着TopicTable [{topic, SubOpts}] 到session进程中,
  • 在 emqttd_server 中,要更新 mqtt_subproperty 表 (key={topic, subscriber}, val=SubOpts),记录这个subscriber对这个topic的订阅属性。 更新 ets 表 mqtt_subscription (bag, key=Subscriber, val=Topic or {Share, Topic}),记录用户的每次订阅。
  • 全局路由表保存在mnesia中,便于同步;本地路由表在ets中,不需要同步。订阅时topic前面带"$local/"的就是本地订阅。

subscribe.png

Erlang设设计相关

  • 1. 使用Pool, Pool, Pool... 推荐GProc库: https://github.com/uwiger/gproc
  • 2. 异步,异步,异步消息...连接层到路由层异步消息,同步请求用于负载保护
  • 3. 避免进程Mailbox累积消息,负载高的进程可以使用gen_server2
  • 4. 消息流经的Socket连接、会话进程必须Hibernate,主动回收binary句柄
  • 5. 多使用Binary数据,避免进程间内存复制
  • 6. 使用ETS, ETS, ETS...Message Passing Vs ETS
  • 7. 避免ETS表非键值字段select, match
  • 8. 避免大量数据ETS读写, 每次ETS读写会复制内存,可使用lookup_element, update_counter
  • 9. 适当开启ETS表{write_concurrency, true}
  • 10. 保护Mnesia数据库事务,尽量减少事务数量,避免事务过载(overload)
  • 11. 避免Mnesia数据表索引,和非键值字段match, select

Comments

comments powered by Disqus