mongo源码版本 4.4.6
service_entry_point_common.cpp 中有
DbResponse ServiceEntryPointCommon::handleRequest(
OperationContext* opCtx,
const Message& m,
const Hooks& behaviors)
//捕获到报文传来的message
erl的mongo驱动中,操作使用的是 query,操作的集合是$CMD,于是逻辑会命中
if (op == dbMsg || (op == dbQuery && isCommand))
{
dbresponse = receivedCommands(opCtx, m, behaviors);
}
接着看 receivedCommands的实现:
首先有:
request = rpc::opMsgRequestFromAnyProtocol(message);
//当操作类型为 dbQuery时,实际执行为:opMsgRequestFromLegacyRequest(message)
接着执行到:
command = CommandHelpers::findCommand(request.getCommandName())
%%command是通过 报文的body.firstElementFieldName(),查询获得。
%%报文操作类型为query,操作集合为$CMD,query字段第一个值为update 或 insert 或 delete时
%%command为write_commands.cpp文件下的对应实现。
%%以 insert 为例,此时command为 CmdInsert类
接着执行到:
execCommandDatabase(opCtx, command, request, replyBuilder.get(), behaviors);
接着看execCommandDatabase的实现:
首先有:
std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request);
CommandInvocation::set(opCtx, invocation); //标记1
举例:command为 CmdInsert类,
std::unique_ptr<CommandInvocation> parse(OperationContext*, const OpMsgRequest& request) override {
return std::make_unique<Invocation>(this, request);
}
//即返回使用 this指针和request 构造的 Invocation类指针, Invocation为嵌套类定义在CmdInsert类中
接着执行到:
runCommandImpl(opCtx,invocation.get(),request,replyBuilder,startOperationTime,...)
注:invocation.get() 的处理
std::shared_ptr<CommandInvocation> CommandInvocation::get(OperationContext* opCtx) {
return invocationForOpCtx(opCtx);
}
//该值是被 CommandInvocation::set 设置,为 set函数的invocation传参
//即在 标记1 处被设置
再看runCommandlmpl的实现:
其核心调用为:
CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder);
CommandHelpers::runCommandInvocation逻辑中核心调用为:
invocation->run(opCtx, response);
//invocation 为定义在CmdInsert类中的 Invocation嵌套类
其run函数会调用 CmdInsert类中的 runImpl 函数
void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override {
auto reply = performInserts(opCtx, _batch);
//执行插入操作
//同理可在write_commands.cpp中找到update和delete操作核心函数:
//performUpdates 和 performDeletes
serializeReply(opCtx, ReplyStyle::kNotUpdate, !_batch.getWriteCommandBase().getOrdered(), _batch.getDocuments().size(), std::move(reply), &result);
}
_batch 在类初始化时,通过 InsertOp::parse(request) 生成
其实现在write_ops_parsers.cpp中:
write_ops::Insert InsertOp::parse(const OpMsgRequest& request) {
auto insertOp = Insert::parse(IDLParserErrorContext("insert"), request);
validateInsertOp(insertOp);
return insertOp;
}
源码中并没有Insert::parse的直接的实现,该实现的文件是在mongo源码编译时通过python代码自动生成的。可以在编译mongo源码后的build目录下找到 write_ops_gen.cpp(基于write_ops.idl生成),其中有Insert::parse的实现逻辑,其余指令同理。
于是mongo对报文的处理和执行逻辑都可以根据上述内容找到相关c++实现,便于对erl的mongo驱动(https://github.com/comtihon/mongodb-erlang)进行优化改造。
如:增加批量upsert操作,在中间数据报错的情况下不影响后续的数据操作。
mc_worker_api.erl增加函数
update_many(Connection, Coll, SelectorDocs) ->
command(Connection, {<<"update">>, Coll, <<"updates">>,
[#{<<"q">> => Selector, <<"u">> => prepare(Doc, fun(D) -> D end), <<"upsert">> => true, <<"multi">> => false}
|| {Selector, Doc} <- SelectorDocs], <<"ordered">>, false}).
mongo_api.erl 增加函数
update_many(Topology, Collection, SelectorDocs) ->
mongoc:transaction(Topology,
fun(#{pool := Worker}) ->
mc_worker_api:update_many(Worker, Collection, SelectorDocs)
end, Opts).
使用示例:
Command = #{<<"$set">> => #{
<<"quantity">> => 500,
<<"details">> => #{<<"model">> => "14Q3", <<"make">> => "xyz"},
<<"tags">> => ["coats", "outerwear", "clothing"]
}},
mc_worker_api:update(Connection, Collection, [{#{<<"_id">> => 100}, Command}]).