mongo协议处理流程 && mongo-erl驱动批量upsert

桑坚
2023-12-01

mongo源码版本 4.4.6

service_entry_point_common.cpp 中有

DbResponse ServiceEntryPointCommon::handleRequest(
	OperationContext* opCtx,                                                   
	const Message& m,                                                   
	const Hooks& behaviors)   
//捕获到报文传来的message

erl的mongo驱动中,操作使用的是 query,操作的集合是$CMD,于是逻辑会命中

 if (op == dbMsg || (op == dbQuery && isCommand)) 
 {         
 	dbresponse = receivedCommands(opCtx, m, behaviors);
 }

接着看 receivedCommands的实现:

首先有:   
	request = rpc::opMsgRequestFromAnyProtocol(message);   
	//当操作类型为 dbQuery时,实际执行为:opMsgRequestFromLegacyRequest(message)     

接着执行到:     
 	command = CommandHelpers::findCommand(request.getCommandName())      
 	%%command是通过 报文的body.firstElementFieldName(),查询获得。      
 	%%报文操作类型为query,操作集合为$CMD,query字段第一个值为update 或 insert 或 delete时      
 	%%command为write_commands.cpp文件下的对应实现。 
 	%%以 insert 为例,此时command为 CmdInsert类 

接着执行到:      
	execCommandDatabase(opCtx, command, request, replyBuilder.get(), behaviors);

接着看execCommandDatabase的实现:

首先有:   
	std::shared_ptr<CommandInvocation> invocation = command->parse(opCtx, request); 
	CommandInvocation::set(opCtx, invocation); //标记1   
	
	举例:command为 CmdInsert类,     
		std::unique_ptr<CommandInvocation> parse(OperationContext*, const OpMsgRequest& request) override {   
			return std::make_unique<Invocation>(this, request); 
		}
		//即返回使用 this指针和request 构造的 Invocation类指针, Invocation为嵌套类定义在CmdInsert类中
	 	

接着执行到:   
	runCommandImpl(opCtx,invocation.get(),request,replyBuilder,startOperationTime,...) 
	注:invocation.get() 的处理 
	std::shared_ptr<CommandInvocation> CommandInvocation::get(OperationContext* opCtx) { 
		return invocationForOpCtx(opCtx);
	}
	//该值是被 CommandInvocation::set 设置,为 set函数的invocation传参 
	//即在 标记1 处被设置 

再看runCommandlmpl的实现:

其核心调用为: 
	CommandHelpers::runCommandInvocation(opCtx, request, invocation, replyBuilder); 
	CommandHelpers::runCommandInvocation逻辑中核心调用为: 
	invocation->run(opCtx, response); 
	//invocation 为定义在CmdInsert类中的 Invocation嵌套类 
其run函数会调用 CmdInsert类中的 runImpl 函数 
	void runImpl(OperationContext* opCtx, BSONObjBuilder& result) const override { 
		auto reply = performInserts(opCtx, _batch); 
		//执行插入操作 
		//同理可在write_commands.cpp中找到update和delete操作核心函数: 
		//performUpdates 和 performDeletes
		serializeReply(opCtx, ReplyStyle::kNotUpdate, !_batch.getWriteCommandBase().getOrdered(), _batch.getDocuments().size(), std::move(reply), &result); 
	} 

_batch 在类初始化时,通过 InsertOp::parse(request) 生成 
其实现在write_ops_parsers.cpp中: 
	write_ops::Insert InsertOp::parse(const OpMsgRequest& request) {
		auto insertOp = Insert::parse(IDLParserErrorContext("insert"), request); 
		validateInsertOp(insertOp); 
		return insertOp; 
	} 

源码中并没有Insert::parse的直接的实现,该实现的文件是在mongo源码编译时通过python代码自动生成的。可以在编译mongo源码后的build目录下找到 write_ops_gen.cpp(基于write_ops.idl生成),其中有Insert::parse的实现逻辑,其余指令同理。

于是mongo对报文的处理和执行逻辑都可以根据上述内容找到相关c++实现,便于对erl的mongo驱动(https://github.com/comtihon/mongodb-erlang)进行优化改造。

如:增加批量upsert操作,在中间数据报错的情况下不影响后续的数据操作。

mc_worker_api.erl增加函数 
update_many(Connection, Coll, SelectorDocs) ->
	command(Connection, {<<"update">>, Coll, <<"updates">>, 
	[#{<<"q">> => Selector, <<"u">> => prepare(Doc, fun(D) -> D end), <<"upsert">> => true, <<"multi">> => false} 
	|| {Selector, Doc} <- SelectorDocs], <<"ordered">>, false}). 

mongo_api.erl 增加函数 
update_many(Topology, Collection, SelectorDocs) -> 
	mongoc:transaction(Topology, 
		fun(#{pool := Worker}) -> 
			mc_worker_api:update_many(Worker, Collection, SelectorDocs) 
		end, Opts). 

使用示例: 
Command = #{<<"$set">> => #{ 
	<<"quantity">> => 500, 
	<<"details">> => #{<<"model">> => "14Q3", <<"make">> => "xyz"}, 
	<<"tags">> => ["coats", "outerwear", "clothing"] 
}}, 
mc_worker_api:update(Connection, Collection, [{#{<<"_id">> => 100}, Command}]).
 类似资料: