当前位置: 首页 > 工具软件 > Mnesia > 使用案例 >

mnesia存储过程(dump)

子车灿
2023-12-01

1.表类型为disc_copies和disc_only_copies的操作会记录到latest.log中

disc_copies类型的表在事务提交后,数据并不会直接落地,而是先写入latest.log中
disc_only_copies则直接写进dets落地,同时也会写入latest.log

2.dump操作会将latest.log的数据转存并落地

  1. disc_copies类型会转存到.DCD和.DCL文件(通过disk_log.erl实现)
  2. ram_copies和disc_only_copies类型不在dump中处理

3.dump的触发时机

1. 手动执行mnesia:dump_log/0

2. 通过mnesia_controller定时触发dump

mnesia_controller.erl

init([Parent]) ->
    process_flag(trap_exit, true),
    ...
    %% 发起定时器
    Ref = next_async_dump_log(),
    ...
    Empty = gb_trees:empty(),
    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref,
		loader_queue = Empty,
		late_loader_queue = Empty}}.
		
next_async_dump_log() ->
    %% 时间间隔为dump_log_time_threshold, 默认为3分钟
    Interval = mnesia_monitor:get_env(dump_log_time_threshold),
    Msg = {next_async_dump_log, time_threshold},
    %% 向自身发消息
    Ref = erlang:send_after(Interval, self(), Msg),
    Ref.

%% 默认为3分钟
default_env(dump_log_time_threshold) ->
    timer:minutes(3);
    
%% 收到dump消息的处理
handle_info({next_async_dump_log, InitBy}, State) ->
    async_dump_log(InitBy),
    Ref = next_async_dump_log(),
    noreply(State#state{dump_log_timer_ref=Ref});

3. 执行mnesia_log:log/1时,累计执行次数等于dump_log_write_threshold时,触发dump

mnesia_dumper.erl

incr_log_writes() ->
    Left = mnesia_lib:incr_counter(trans_log_writes_left, -1),
    if
    %% 剩余次数为0
	Left =:= 0 ->
	    adjust_log_writes(true);
	true ->
	    ignore
    end.

adjust_log_writes(DoCast) ->
    Token = {mnesia_adjust_log_writes, self()},
    case global:set_lock(Token, [node()], 1) of
	false ->
	    ignore; %% Somebody else is sending a dump request
	true ->
	    case DoCast of
		false ->
		    ignore;
		true ->
		    %% 通知mnesia_controller执行dump
		    mnesia_controller:async_dump_log(write_threshold)
	    end,
	    %% dump_log_write_threshold默认为1000
	    Max = mnesia_monitor:get_env(dump_log_write_threshold),
	    Left = mnesia_lib:read_counter(trans_log_writes_left),
	    %% 重新设置次数
	    mnesia_lib:set_counter(trans_log_writes_left, Max),
	    Diff = Max - Left,
	    _ = mnesia_lib:incr_counter(trans_log_writes, Diff),
	    global:del_lock(Token, [node()])
    end.

4.dump的流程

dump通过调用函数mnesia_controller:async_dump_log/1执行

async_dump_log(InitBy) ->
    %% 向自己发消息
    ?SERVER_NAME ! {async_dump_log, InitBy},
    ok.

handle_info({async_dump_log, InitBy}, State) ->
    Worker = #dump_log{initiated_by = InitBy},
    State2 = add_worker(Worker, State),
    noreply(State2);
    
add_worker(Worker = #dump_log{}, State) ->
    InitBy = Worker#dump_log.initiated_by,
    Queue = State#state.dumper_queue,
    ...
    Queue2 = Queue ++ [Worker],
    State2 = State#state{dumper_queue = Queue2},
    opt_start_worker(State2);
    
opt_start_worker(State) ->
    case State#state.dumper_queue of
	[Worker | _Rest] when State#state.dumper_pid == undefined ->
	    if
		is_record(Worker, dump_log) ->
		    Pid = spawn_link(?MODULE, dump_and_reply, [self()]), 
		   %% dump_and_reply函数会继续跳转到mnesia_dumper:opt_dump_log/1
		   %% 然后再进一步跳转到mnesia_dumper:perform_dump/2
		    ...
    ok.

dump_and_reply函数会继续跳转到mnesia_dumper:opt_dump_log/1,然后再进一步跳转到mnesia_dumper:perform_dump/2

mnesia_dumper.erl

perform_dump(InitBy, Regulator) ->
    ?eval_debug_fun({?MODULE, perform_dump}, [InitBy]),
    %% 这个函数将latest.log文件改名为previous.log
    LogState = mnesia_log:prepare_log_dump(InitBy),
    adjust_log_writes(false),
    case LogState of
	{needs_dump, Diff} ->
	    U = mnesia_monitor:get_env(dump_log_update_in_place),
	    %% 加载previous.log
	    Cont = mnesia_log:init_log_dump(),
	    mnesia_recover:sync(),
	    try do_perform_dump(Cont, U, InitBy, Regulator, undefined).
	    %% 之后会进一步调用insert_op -> insert_rec -> 
	    %% do_insert_rec ->  insert_ops -> open_files
	    %% open_file通过open_disc_copies打开日志,再调用disc_insert写入

open_disc_copies/2

此函数通过mnesia_log打开DCD或DCL文件,供之后的disc_insert写入。

注意,DCD或DCL文件通过disk_log.erl实现,并不是dets!

open_disc_copies(Tab, InitBy) ->
    %% 判断存储.DCL还是.DCD
    DumpEts = needs_dump_ets(Tab),
    if
	DumpEts == false; InitBy == startup ->
            DclF = mnesia_lib:tab2dcl(Tab),
	    mnesia_log:open_log({?MODULE,Tab},
				mnesia_log:dcl_log_header(),
				DclF,
				mnesia_lib:exists(DclF),
				mnesia_monitor:get_env(auto_repair),
				read_write),
	    put({?MODULE, Tab}, {opened_dumper, dcl}),
	    true;
	true ->
	    mnesia_log:ets2dcd(Tab),
	    put({?MODULE, Tab}, already_dumped),
	    false
    end.

%% 判断存储.DCL还是.DCD
needs_dump_ets(Tab) ->
    DclF = mnesia_lib:tab2dcl(Tab),
    case file:read_file_info(DclF) of
        {error, enoent} ->
            false;
        {ok, DclInfo} ->
            DcdF =  mnesia_lib:tab2dcd(Tab),
            case file:read_file_info(DcdF) of
                {error, Reason} ->
                    mnesia_lib:dbg_out("File ~tp info_error ~tp ~n",
                                       [DcdF, Reason]),
                    true;
                {ok, DcdInfo} ->
                    Mul = case ?catch_val(dc_dump_limit) of
                              {'EXIT', _} -> ?DumpToEtsMultiplier;
                              Val -> Val
                          end,
                    %% 判断公式
                    DcdInfo#file_info.size =< (DclInfo#file_info.size * Mul)
            end
    end.


disc_insert/8

disc_insert(_Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
    case open_files(Tab, Storage, InPlace, InitBy) of
	true ->
	    case Storage of
		disc_copies when Tab /= schema ->
		    %% 写入上一步打开的DCL或DCD文件
		    mnesia_log:append({?MODULE,Tab}, {{Tab, Key}, Val, Op}),
		    ok;
		_ ->
		    dets_insert(Op,Tab,Key,Val)
	    end;
	false ->
	    ignore
    end.

5.结语

  1. disc_copies类型会转存到.DCD和.DCL文件(通过disk_log.erl实现),并不是dets。但disk_log并不是实时写磁盘,且terminate函数中并没有做保存处理,所以个人认为有一定丢数据的风险。
  2. ram_copies在事务提交时通过ets存储
  3. disc_only_copies在事务提交时直接通过dets存储,个人也没搞懂为何也要写latest.log
 类似资料: