emqttd随笔
结构
emq-relx文件组织结构
+_rel/ # 编译发布之后的目标文件. -bin/ cuttlefish emqenv emqttd emqttd.cmd emqttd_ctl emqttd_ctl.cmd install_upgrade_escript nodetool -data/ loaded_plugins # 需要加载的插件放在这里. +deps/ # 所有依赖的app. -rel/ # 与发布相关的配置文件, 这些文件编译的时候大部分会被作为模板拷贝到目标文件中. -conf/ -plugins/ ## 插件的配置文件. emq_dashboard.conf acl.conf emq.conf -schema/ emq.schema emq_dashboard.schema erlang_vm.schema .gitignore LICENSE Makefile # 和erlang.mk一起使用来编译和发布. README.md erlang.mk relx # erlang release的程序. relx.config # relx的配置文件. vars.config # relx用于覆盖替换模板的文件.
两条线
编译和发布
从执行make开始进行了什么操作?
Makefile中首先定义了一些变量,这些变量包括PROJECT,PROJECT_DESCRIPTION,PROJECT_VERSION, DEPS和以"dep_"开头的依赖项下载方式, 这些变量的作用是指定依赖项。在随后的include erlang.mk中会使用这些变量,并一一下载并。
此外还定义了一个plugins目标,并将其作为app的先决条件。那什么时候执行app这个目标呢?我们看到Makefile中有一个include erlang.mk, 这个很重要,我们在执行make的时候就是因为有include才会执行erlang.mk的,而app是erlang.mk里面已经有定义的,所以会先执行plugins, 然后 在执行app。
我们运行make的时候,没有执行具体的目标的时候,会默认执行make all。我们在erlang.mk中看到all: deps app rel,也就是all包括三部分, 其中deps负责下载Makefile中指定的依赖app,app则是在编译的时候生成相应的bin文件,rel负责生成发布。
- relx发布工具
主要时relx.config的配置问题。配置都是使用tuple格式, 每一项结尾用点号结尾。
{release, {appname, vsn}, [apps]}.
apps通过tuple格式指定的依赖app版本和启动方式。主要说明一下启动方式:
参考:http://erlang.org/doc/man/rel.html, 有permanent|transient|temporary|load|none,具体解释:
Start type of an application included in the release. If Type = permanent | transient | temporary, the application is loaded and started in the corresponding way, see application(3). If Type = load, the application is only loaded. If Type = none, the application is not loaded and not started, although the code for its modules is loaded. Defaults to permanent
emqttd的执行
从执行bin/emqttd console开始进行了什么操作.
- 配置信息
从2.0.3开始,emqttd的配置方式进行了更改,使用cuttlefish库将*.conf(包括插件中的conf文件)中的每行Key=Value的配 置在启动emqttd的时候,临时转化为传统的app.config的格式, 启动的时候读取的配置信息其实还是传统的app.config的格 式。如果插件的配置文件也是*.conf, 那么就会在emqttd启动的时候一起把配置文件加载到系统中。
同时也兼容传统的配置方式,这种方式的配置文件是以*.config的命令在配置文件目录中的,加载插件的时候通过emqttd_plugins:init/0, 将文件加载进到系统中,随后再emqttd_plugins:load/0将插件app启动。 注意,*.config类型的配置文件不会被cuttlefish 处理到app.config文件中,因此这类参数不会自动加载,需要再emqttd_plugins:init/0中加载。
# 这里首先定义cuttlefish生成配置文件的命令前缀。 CUTTLEFISH="on" if [ -z "$CUTTLEFISH" ]; then CUTTLEFISH_COMMAND_PREFIX="" else CUTTLEFISH_COMMAND_PREFIX="$ERTS_PATH/escript $RUNNER_ROOT_DIR/bin/cuttlefish -s $REL_DIR/schema -d $RUNNER_DATA_DIR/configs" fi
接着定义生成配置文件的函数:
generate_config() { if [ -z "$CUTTLEFISH" ]; then # Note: we have added a parameter '-vm_args' to this. It # appears redundant but it is not! the erlang vm allows us to # access all arguments to the erl command EXCEPT '-args_file', # so in order to get access to this file location from within # the vm, we need to pass it in twice. CONFIG_ARGS=" -config $RUNNER_ETC_DIR/app.config -args_file $RUNNER_ETC_DIR/vm.args -vm_args $RUNNER_ETC_DIR/vm.args " else # 将emqttd的配置文件和插件的配置文件merge在一起. APPCONF=`relx_nodetool mergeconf $RUNNER_ETC_DIR/emq.conf $RUNNER_ETC_DIR/plugins $RUNNER_DATA_DIR/configs` if [ "$?" -ne 0 ]; then echoerr "Error merging configs!" exit 1 fi replace_env_in_conf # 生成配置文件。 CONFIG_ARGS=`$CUTTLEFISH_COMMAND_PREFIX -c $APPCONF generate` if [ "$?" -ne 0 ]; then echoerr "Error generating config with cuttlefish" echoerr " run \`$RUNNER_SCRIPT config generate -l debug\` for more information." exit 1 fi fi if ! relx_nodetool chkconfig $CONFIG_ARGS; then echoerr "Error reading $CONFIG_ARGS" exit 1 fi }
最后在执行的时候调用上面的函数在data/configs/中生成app.config和vm.args。
- 插件的加载
前面说了插件使用两种方式进行配置的参数加载方式。值得注意的是,在 emqttd_plugins:init/0中加载配置参数的时候,只要在 etc/plugins中所有*.config的配置参数都会被加载到系统中,但是emqttd_plugins:load/0启动插件app的时候则是仅仅将loaded_plugins文件 中已有的app启动。 因此,系统中其实存在一堆没有用的参数。
关于虚拟机的配置参数
Flags
heart
启动erlang runtime system的心跳监控。 这篇文章 解释了heart的原理。启动虚拟的的时候,如果指定-heart 参数,那么会通过模块 heart.erl 启动一个独立的外部进程,名为heart,该进程会监控虚拟机,虚拟机每个
表结构
先列出emq所有的ets和mnesia表:
Table Name | ets/mnesia | Owner | Attribute | Value | Specification |
---|---|---|---|---|---|
mqttd_ctl_cmd | ets | ctl | [ordered_set, named_table, protected] | {{Seq, Cmd}, {Mod, Fun}, Opts} | 按序号记录emq的控制命令。 |
mqtt_hook | ets | hook | [set, protected, named_table, {keypos, #hook.name}] | #hook{name, [#callback{tag, function, init_args, priority}]} | 记录你使用到的hook,主要是插件中使用的。(callbacks按照priority排序) |
mqtt_local_route | ets | router | [set, named_table, protected] | {Topic, node()} | 本地订阅的路由表(全局路由表是保存在mnesia表mqtt_route中) |
mqtt_topic | mnesia | mnesia_monitor/router | [{ram_copies, [node()]}, {record_name, mqtt_topic}, {attributes, record_info(fields, mqtt_topic)}] | #mqtt_topic{topic, flags=[]::[retained or static]} | 记录被订阅的所有topic信息,内容是#mqtt_topic. |
mqtt_route | mnesia | router | [{type, bag}, {ram_copies, [node()]}, {record_name, mqtt_route}, {attributes, record_info(fields, mqtt_route)}] | #mqtt_route{topic,node} | 全局的路由表, 内容是#mqtt_route。 |
mqtt_trie | mnesia | mnesia_monitor/trie | [{ram_copies, [node()]},{record_name, trie},{attributes, record_info(fields, trie)}] | #trie{#trie_edge{node_id, word}, node_id} | 仅针对含统配的topic。 |
mqtt_trie_node | mnesia | mnesia_monitor/trie_node | [{ram_copies, [node()]},{record_name, trie_node},{attributes, record_info(fields, trie_node)}] | #trie_node{node_id, edge_count, topic, flags} | 仅针对含统配的topic。 |
mqtt_subproperty | ets | pubsub_sup | [public, named_table, set, {read_concurrency, true}, {write_concurrency, true}] | {{Topic, Subscriber}, [local, {share, Share} or {share, '$queue'}]} | pubsub_sup创建,记录某个topic被某个client的订阅属性。 |
mqtt_subscriber | ets | pubsub_sup | [public, named_table, duplicate_bag, {read_concurrency, true}, {write_concurrency, true}] | {Topic or {local, Topic}, Subscriber or {Share, Subscriber}} | 本地节点的订阅关系,key是topic,value是clientId。表示该topic被谁订阅。 |
mqtt_subscription | ets | pubsub_sup | [public, named_table, bag, {read_concurrency, true}, {write_concurrency, true}] | 本地节点的订阅关系,key是clientId,value是topic。表示哪个client订阅了哪个topic。 | |
mqtt_stats | ets | stats | [set, public, named_table, {write_concurrency, true}] | 统计dashboard的stats内容。 | |
mqtt_client_stats | ets | stats | [set, public, named_table, {write_concurrency, true}] | client_enable_stats参数打开才会统计, 统计client_pid的资源占用情况, 下表下的解释。 | |
mqtt_session_stats | ets | stats | [set, public, named_table, {write_concurrency, true}] | session_enable_stats参数打开才会统计, 统计session_pid的资源占用情况, 下表下的解释。 | |
mqtt_metric | ets | metrics | [set, public, named_table, {write_concurrency, true}] | 记录dashboard的metrics部分(包括packet,message,bytes个数)。 | |
mqtt_client | ets | cm_sup | [ordered_set, named_table, public, {keypos, 2}, {write_concurrency, true}] | cm_sup创建,用于本地的client的注册。内容是#mqtt_client. | |
mqtt_local_session | ets | sm_sup | [public, ordered_set, named_table, {write_concurrency, true}] | {ClientId, ClientPid, CleanSess, Properties} | sm_sup创建并维护,记录本地节点上的session(session在本地注册)。 |
mqtt_session | mnesia | mnesia_monitor/sm | [{type, set}, {ram_copies, [node()]}, {record_name, mqtt_session}, {attributes, record_info(fields, mqtt_session)}] | #mqtt_session{client_id, sess_pid, clean_sess} | 记录全局的路由信息。本地路由信息保存在ets表mqtt_local_session. |
mqtt_access_control | ets | access_control | [set, named_table, protected, {read_concurrency, true}] | {auth_modules/acl_modules, [{Mod, ModState, Seq}...]} | 包含两个条目auth_modules,acl_modules,将auth/acl规则注册在这里,auth/acl的时候从这个获取要执行的模块和相应的参数。 |
mqtt_acl_rule | ets | access_control | [set, public, named_table, {read_concurrency, true}] | 记录internal的acl规则, acl_modules中的emqttd_acl_internal的规则会保存在这里,acl会查询这里。 | |
mqtt_broker | ets | broker | [set, public, named_table] | 记录broker的信息,目前暂时没有使用。 | |
mqtt_retained | mnesia | mnesia_monitor/retained | 可设置 | #mqtt_retained{topic, msg, ts} | 记录retain消息。 |
mqtt_admin | mnesia | mnesia_monitor/dashboard | [{type, set}, {local_content, true}, {disc_copies, [node()]}, {record_name, mqtt_admin}, {attributes, record_info(fields, mqtt_admin)}] | #mqtt_admin{username, password, tags} | 记录dashoard的管理员。 |
- mqtt_client_stat: {<clientId>, [{'$ts',1497948011}, {mailbox_len,0}, {heap_size,610}, {reductions,802}, {recv_pkt,1}, {recv_msg,0}, {send_pkt,0}, {send_msg,0}, {recv_oct,84}, {recv_cnt,1}, {send_oct,4}, {send_cnt,1}, {send_pend,0}]}
- mqtt_session_stat: {<ClientId>, [{'$ts',1497948010}, {mailbox_len,0}, {heap_size,610}, {reductions,327}, {max_subscriptions,0}, {subscriptions,0}, {max_inflight,32}, {inflight_len,0}, {max_mqueue,0}, {mqueue_len,0}, {mqueue_dropped,0}, {max_awaiting_rel,100}, {awaiting_rel_len,0}, {deliver_msg,0}, {enqueue_msg,0}]}
注意,ets只能记录本地共享的信息。项目中主要是注册信息,统计信息(目前只记录本地的统计信息,不会记录全局的统计信息)。
关于代码
关于esockd_listener_sup
注意这个supervisor在启动的init中没有启动任何子进程,而且指定子进程的spec是:
init([]) -> {ok, {{rest_for_one, 10, 3600}, []}}.
connection_sup, acceptor_sup, listener三个子进程一次启动,而先启动的子进程就会作为参数给下一个启动的子进程。 因此,如果一个子进程挂了,那么后面启动的子进程也必须重新启动,否则前面启动的作为参数的就无效了。
关于esockd_connection_sup
esockd_connection_sup不是一个严格的supervisor。它只是一个gen_server。这是因为他的特殊性决定的。
start_link(Options, MFArgs, Logger) -> gen_server:start_link(?MODULE, [Options, MFArgs, Logger], []).
supervisor中的必须有重启策略,如果没有在spec中填写默认就是one_for_one。这几种重启策略都会重启子进程。 但是对于sockt连接,断了就是断了,不应该重启的。因此不需要什么重启策略。那么supervisor怎么也得有个监督关系啊, 需要的是当子进程挂了的时候,supervisor要收到消息。
那他是怎么启动子进程(socket连接)的呢?
在esockd_connection_sup.erl中,Conn:start_link(MFArgs)函数调用 emqttd_client:start_link/2 来创建client进程。
esockd_connection_sup.erl: start_connection(Sup, Mod, Sock, SockFun) -> case call(Sup, {start_connection, Sock, SockFun}) of {ok, Pid, Conn} -> % transfer controlling from acceptor to connection Mod:controlling_process(Sock, Pid), Conn:go(Pid), {ok, Pid}; {error, Error} -> {error, Error} end. handle_call({start_connection, Sock, SockFun}, _From, State = #state{conn_opts = ConnOpts, mfargs = MFArgs, curr_clients = Count, access_rules = Rules}) -> case inet:peername(Sock) of {ok, {Addr, _Port}} -> case allowed(Addr, Rules) of true -> Conn = esockd_connection:new(Sock, SockFun, ConnOpts), case catch Conn:start_link(MFArgs) of {ok, Pid} when is_pid(Pid) -> ...
emqttd_client:start_link/2 调用 proc_lib:spawn_link/3 来启动进程:
emqttd_client.erl: start_link(Conn, Env) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}.
为什么这里要使用proc_link:spwan_link/3来启动连接进程呢?因为这个函数最终是调用erlang:spawn_link来启动,并自动创建link。 该函数和erlang:start_link的方式区别是spawn_link属于异步启动进程。一调用就会返回子进程ID。 他的用处在emqttd_client:init中看到:
init([Conn0, Env]) -> {ok, Conn} = Conn0:wait(), case Conn:peername() of {ok, Peername} -> do_init(Conn, Env, Peername); {error, enotconn} -> Conn:fast_close(), exit(normal); {error, Reason} -> Conn:fast_close(), exit({shutdown, Reason}) end.
这里的 Conn0:wait():
esockd_connection.erl: wait(Conn = ?CONN_MOD) -> receive {go, Conn} -> upgrade(Conn) end.
使用 receive 来接受消息{go, Conn}。如果emqttd_client:start_link中不使用spawn_link来启动进程,那么在 init 中就会卡死。 这样在 esockd_connection_sup:start_connection(Sup, Mod, Sock, SockFun) 中,Conn:go()就不会被执行。因此出现wait()一直 等待go()发出消息。
如果使用spawn_link就会直接返回,init中执行wait,go被执行后发出消息由wait收到,然后才执行do_init(Conn, Env, Peername) 函数。
另外,esockd_connection_sup 中和子进程link之后,相互都会收到对方 exit 的消息,这样可能 esockd_connection_sup 可能会因为 子进程挂掉而挂掉,为了避免这种情况,esockd_connection_sup 启动的时候在init中设置 process_flag(trap_exit, true), 这样 可以将子进程发送的 exit 消息转化为消息{'EXIT', Pid, Reason},从而避免 esockd_connection_sup 被牵连而挂掉。
handle_info({'EXIT', Pid, Reason}, State = #state{curr_clients = Count, logger = Logger}) -> ...
总结一下:只要理解了spawn_link的异步方式就可以理解wait和go了。
关于emqttd_client
emqttd_client进程启动
上一节中已经讲到 connection 进程是esockd_connection_sup调用 proc_lib:spawn_link 启动的。这个进程我们希望他是符合otp的通用服务器, 因此,在do_init中使用了:
gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, {backoff, 2000, 2000, 20000}).
我们看看 gen_server 中 enter_loop 的官方文档(gen_server2基本一样):
enter_loop(Module, Options, State) enter_loop(Module, Options, State, ServerName) enter_loop(Module, Options, State, Timeout) enter_loop(Module, Options, State, ServerName, Timeout) Makes an existing process into a gen_server process. Does not return, instead the calling process enters the gen_server process receive loop and becomes a gen_server process. The process must have been started using one of the start functions in proc_lib(3). The user is responsible for any initialization of the process, including registering a name for it. This function is useful when a more complex initialization procedure is needed than the gen_server process behavior provides.
- 也就是说让一个已经存在的进程变成 gen_server 通用服务器进程。该函数不会返回,而是变成通用服务器进入 循环接收消息的状态。
- 但是有个要求是这个进程必须由proc_lib中的启动函数启动,并由用户负责进程的所有初始化,包括注册进程名。 这样后面我们就可以完全把他当做gen_server/gen_server2的进程来看待了。
- 这个函数用在进程初始化比gen_server提供的初始化更复杂的情况下。
接收消息
首先,在esockd_listener.erl中创建监听socket的时候:
init({Protocol, ListenOn, Options, AcceptorSup, Logger}) -> Port = port(ListenOn), process_flag(trap_exit, true), %%Don't active the socket... SockOpts = merge_addr(ListenOn, proplists:get_value(sockopts, Options, [{reuseaddr, true}])), case esockd_transport:listen(Port, [{active, false} | proplists:delete(active, SockOpts)]) of {ok, LSock} -> ...
我们发现监听 socket 是 {active,false} 的被动socket,因此,在接收数据的时候每次都要通过执行:
Conn:async_recv(0, infinity),
来启动下一次数据的接收。(这和gen_tcp中的被动模式每次都要使用gen_tcp:recv()来接收数据一样), 这样做可以便于流控 。
下面我们看看接收消息的具体步骤, 在emqttd_client中,接收消息有几个state中的参数控制:conn_state, await_recv(初始时候=false)。
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = iolist_size(Data), ?LOG(debug, "RECV ~p", [Data], State), emqttd_metrics:inc('bytes/received', Size), received(Data, rate_limit(Size, State#client_state{await_recv = false})); ... rate_limit(_Size, State = #client_state{rate_limit = undefined}) -> run_socket(State); rate_limit(Size, State = #client_state{rate_limit = Rl}) -> case Rl:check(Size) of {0, Rl1} -> run_socket(State#client_state{conn_state = running, rate_limit = Rl1}); {Pause, Rl1} -> ?LOG(warning, "Rate limiter pause for ~p", [Pause], State), erlang:send_after(Pause, self(), activate_sock), State#client_state{conn_state = blocked, rate_limit = Rl1} end. run_socket(State = #client_state{conn_state = blocked}) -> State; run_socket(State = #client_state{await_recv = true}) -> State; run_socket(State = #client_state{connection = Conn}) -> Conn:async_recv(0, infinity), State#client_state{await_recv = true}.
当conn_state=blocked(流控的时候可能将其置为blocked)的时候不能接收消息,当 await_recv=true 的时候也不能接收消息, 当conn_state=/=blocked and await_recv=true的时候才可以通过 Conn:async_recv(0, infinity) 来接收消息。
通过 Conn:async_recv(0, infinity) 打开接收消息之后立刻讲await_recv=true,等待handle_info中接收完这一次消息之后,再 将await_recv=false,再次接收消息。
emqttd_trie.
如果topic中含有通配符,需要插入topic的字典树中。
-type(trie_node_id() :: binary() | atom()). %% 表示所有的节点,以及该节点的所有属性. -record(trie_node, { node_id :: trie_node_id(), edge_count = 0 :: non_neg_integer(), %% edge_count表示这个节点是其他多少个节点的path。 topic :: binary() | undefined, %% 该节点对应的完成topic. flags :: [retained | static] }). %% 表示一个node -record(trie_edge, { node_id :: trie_node_id(), word :: binary() | atom() }). -record(trie, { edge :: #trie_edge{}, node_id :: trie_node_id() }).
triples 依次获取路径,尾点,node_id(节点).
> emqttd_topic:triples(<<"v2/a/+/c">>). [{root,<<"v2">>,<<"v2">>}, {<<"v2">>,<<"a">>,<<"v2/a">>}, {<<"v2/a">>,'+',<<"v2/a/+">>}, {<<"v2/a/+">>,<<"c">>,<<"v2/a/+/c">>}]
相关模块
- emqttd_app: 服务启动的总入口。
- esockd_connection_sup: 监控client。创建 client 进程。
- emqttd_client: mqtt tcp 的客户端连接进程。
- emqttd_protocol: 处理mqtt协议相关的逻辑。也就是处理各种type的数据包的逻辑。
- emqttd_parser: mqtt协议包解析。
- emqttd_serializer: mqtt协议包序列化。
- emqttd_session_sup: 监控session。创建session进程。
- emqttd_sm: 负责session的管理。包括调用session_sup创建session;重建,复用,销毁session(mnesia的mqtt_session);注册,销毁session(ets的mqtt_local_session) 消息到本地的分发(查询mqtt_local_session)
- emqttd_cm: 负责client的管理。client的session 进程创建成功后会将这个进程注册到ets中,并建立client和cm之间的monitor关系, 由 cm 监控 client 的生死存亡; 统计client的数量。
消息流
subscribe
- session进程内部使用一个map(subscriptions)来保存所有订阅的topic,key为topic,value为qos。 当一个sub发生时,带着TopicTable [{topic, SubOpts}] 到session进程中,
- 在 emqttd_server 中,要更新 mqtt_subproperty 表 (key={topic, subscriber}, val=SubOpts),记录这个subscriber对这个topic的订阅属性。 更新 ets 表 mqtt_subscription (bag, key=Subscriber, val=Topic or {Share, Topic}),记录用户的每次订阅。
- 全局路由表保存在mnesia中,便于同步;本地路由表在ets中,不需要同步。订阅时topic前面带"$local/"的就是本地订阅。
Erlang设设计相关
- 1. 使用Pool, Pool, Pool... 推荐GProc库: https://github.com/uwiger/gproc
- 2. 异步,异步,异步消息...连接层到路由层异步消息,同步请求用于负载保护
- 3. 避免进程Mailbox累积消息,负载高的进程可以使用gen_server2
- 4. 消息流经的Socket连接、会话进程必须Hibernate,主动回收binary句柄
- 5. 多使用Binary数据,避免进程间内存复制
- 6. 使用ETS, ETS, ETS...Message Passing Vs ETS
- 7. 避免ETS表非键值字段select, match
- 8. 避免大量数据ETS读写, 每次ETS读写会复制内存,可使用lookup_element, update_counter
- 9. 适当开启ETS表{write_concurrency, true}
- 10. 保护Mnesia数据库事务,尽量减少事务数量,避免事务过载(overload)
- 11. 避免Mnesia数据表索引,和非键值字段match, select