grpc client和sever之间的数据传输容器,可以在多个rpc函数中共享数据
先跟进MapBuilderContext类
template <class SubmapType>
class MapBuilderContext : public MapBuilderContextInterface {
public:
MapBuilderContext(MapBuilderServer* map_builder_server);
mapping::MapBuilderInterface& map_builder() override;
common::BlockingQueue<std::unique_ptr<MapBuilderContextInterface::Data>>&
sensor_data_queue() override;
mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
GetLocalSlamResultCallbackForSubscriptions() override;
void AddSensorDataToTrajectory(const Data& sensor_data) override;
MapBuilderContextInterface::LocalSlamSubscriptionId SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback) override;
void UnsubscribeLocalSlamResults(
const LocalSlamSubscriptionId& subscription_id) override;
int SubscribeGlobalSlamOptimizations(
GlobalSlamOptimizationCallback callback) override;
void UnsubscribeGlobalSlamOptimizations(int subscription_index) override;
void NotifyFinishTrajectory(int trajectory_id) override;
LocalTrajectoryUploaderInterface* local_trajectory_uploader() override;
void EnqueueSensorData(int trajectory_id,
std::unique_ptr<sensor::Data> data) override;
void EnqueueLocalSlamResultData(int trajectory_id,
const std::string& sensor_id,
const mapping::proto::LocalSlamResultData&
local_slam_result_data) override;
void RegisterClientIdForTrajectory(const std::string& client_id,
int trajectory_id) override;
bool CheckClientIdForTrajectory(const std::string& client_id,
int trajectory_id) override;
private:
MapBuilderServer* map_builder_server_;
mapping::SubmapController<SubmapType> submap_controller_;
std::map</*trajectory_id=*/int, /*client_id=*/std::string> client_ids_;
};
再跟进MapBuilderContextInterface
class MapBuilderContextInterface : public async_grpc::ExecutionContext {
public:
struct LocalSlamResult {
int trajectory_id;
common::Time time;
transform::Rigid3d local_pose;
std::shared_ptr<const sensor::RangeData> range_data;
std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
insertion_result;
};
// Calling with 'nullptr' signals subscribers that the subscription has ended,
// e.g. this happens when the corresponding trajectory was finished and hence
// no more local SLAM updates will occur.
// The callback can return 'false' to indicate that the client is not
// interested in more local SLAM updates and 'MapBuilderServer' will end the
// subscription.
using LocalSlamSubscriptionCallback =
std::function<bool(std::unique_ptr<LocalSlamResult>)>;
// The callback can return 'false' to indicate that the client is not
// interested in more global SLAM runs and 'MapBuilderServer' will end the
// subscription.
using GlobalSlamOptimizationCallback = std::function<bool(
const std::map<int /* trajectory_id */, mapping::SubmapId>&,
const std::map<int /* trajectory_id */, mapping::NodeId>&)>;
struct Data {
int trajectory_id;
std::unique_ptr<sensor::Data> data;
};
struct LocalSlamSubscriptionId {
const int trajectory_id;
const int subscription_index;
};
MapBuilderContextInterface() = default;
~MapBuilderContextInterface() = default;
MapBuilderContextInterface(const MapBuilderContextInterface&) = delete;
MapBuilderContextInterface& operator=(const MapBuilderContextInterface&) =
delete;
virtual mapping::MapBuilderInterface& map_builder() = 0;
virtual common::BlockingQueue<std::unique_ptr<Data>>& sensor_data_queue() = 0;
virtual mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
GetLocalSlamResultCallbackForSubscriptions() = 0;
virtual void AddSensorDataToTrajectory(const Data& sensor_data) = 0;
virtual LocalSlamSubscriptionId SubscribeLocalSlamResults(
int trajectory_id, LocalSlamSubscriptionCallback callback) = 0;
virtual void UnsubscribeLocalSlamResults(
const LocalSlamSubscriptionId& subscription_id) = 0;
virtual int SubscribeGlobalSlamOptimizations(
GlobalSlamOptimizationCallback callback) = 0;
virtual void UnsubscribeGlobalSlamOptimizations(int subscription_index) = 0;
virtual void NotifyFinishTrajectory(int trajectory_id) = 0;
virtual LocalTrajectoryUploaderInterface* local_trajectory_uploader() = 0;
virtual void EnqueueSensorData(int trajectory_id,
std::unique_ptr<sensor::Data> data) = 0;
virtual void EnqueueLocalSlamResultData(
int trajectory_id, const std::string& sensor_id,
const mapping::proto::LocalSlamResultData& local_slam_result_data) = 0;
virtual void RegisterClientIdForTrajectory(const std::string& client_id,
int trajectory_id) = 0;
virtual bool CheckClientIdForTrajectory(const std::string& client_id,
int trajectory_id) = 0;
};
再跟进ExecutionContext
class ExecutionContext {
public:
// Automatically locks an ExecutionContext for shared use by RPC handlers.
// This non-movable, non-copyable class is used to broker access from various
// RPC handlers to the shared 'ExecutionContext'.
template <typename ContextType>
class Synchronized {
public:
ContextType* operator->() {
return static_cast<ContextType*>(execution_context_);
}
Synchronized(common::Mutex* lock, ExecutionContext* execution_context)
: locker_(lock), execution_context_(execution_context) {}
Synchronized(const Synchronized&) = delete;
Synchronized(Synchronized&&) = delete;
private:
common::MutexLocker locker_;
ExecutionContext* execution_context_;
};
ExecutionContext() = default;
virtual ~ExecutionContext() = default;
ExecutionContext(const ExecutionContext&) = delete;
ExecutionContext& operator=(const ExecutionContext&) = delete;
common::Mutex* lock() { return &lock_; }
private:
common::Mutex lock_;
};
这么一长串想要干嘛?我们之前说过了MapBuilderContext是在各个rpc函数中共享数据,那这些数据是从哪拿来的呢?我们一步一步看
首先是map_builder_server_main.cc,在Run函数中构造了MapBuilder,我们之前说这个是cartographer表演的指挥家,但是这里它只是MapBuilderServer的一个成员变量,可见MapBuilderServer才是真正的幕后大佬。
void Run(const std::string& configuration_directory,
const std::string& configuration_basename) {
#if USE_PROMETHEUS
metrics::prometheus::FamilyFactory registry;
::cartographer::metrics::RegisterAllMetrics(®istry);
RegisterMapBuilderServerMetrics(®istry);
::prometheus::Exposer exposer("0.0.0.0:9100");
exposer.RegisterCollectable(registry.GetCollectable());
LOG(INFO) << "Exposing metrics at http://localhost:9100/metrics";
#endif
proto::MapBuilderServerOptions map_builder_server_options =
LoadMapBuilderServerOptions(configuration_directory,
configuration_basename);
auto map_builder = absl::make_unique<mapping::MapBuilder>(
map_builder_server_options.map_builder_options());
std::unique_ptr<MapBuilderServerInterface> map_builder_server =
CreateMapBuilderServer(map_builder_server_options,
std::move(map_builder));
map_builder_server->Start();
map_builder_server->WaitForShutdown();
}
我们接着看MapBuilderServer的构造函数
MapBuilderServer::MapBuilderServer(
const proto::MapBuilderServerOptions& map_builder_server_options,
std::unique_ptr<mapping::MapBuilderInterface> map_builder)
: map_builder_(std::move(map_builder)) {
async_grpc::Server::Builder server_builder;
server_builder.SetServerAddress(map_builder_server_options.server_address());
server_builder.SetNumGrpcThreads(
map_builder_server_options.num_grpc_threads());
server_builder.SetNumEventThreads(
map_builder_server_options.num_event_threads());
server_builder.SetMaxSendMessageSize(kMaxMessageSize);
server_builder.SetMaxReceiveMessageSize(kMaxMessageSize);
if (!map_builder_server_options.uplink_server_address().empty()) {
local_trajectory_uploader_ = CreateLocalTrajectoryUploader(
map_builder_server_options.uplink_server_address(),
map_builder_server_options.upload_batch_size(),
map_builder_server_options.enable_ssl_encryption(),
map_builder_server_options.enable_google_auth());
}
server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();
server_builder.RegisterHandler<handlers::AddImuDataHandler>();
server_builder.RegisterHandler<handlers::AddRangefinderDataHandler>();
server_builder.RegisterHandler<handlers::AddFixedFramePoseDataHandler>();
server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
server_builder.RegisterHandler<handlers::DeleteTrajectoryHandler>();
server_builder.RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
server_builder.RegisterHandler<handlers::GetSubmapHandler>();
server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
server_builder.RegisterHandler<handlers::GetTrajectoryStatesHandler>();
server_builder.RegisterHandler<handlers::GetLandmarkPosesHandler>();
server_builder.RegisterHandler<handlers::GetAllSubmapPosesHandler>();
server_builder.RegisterHandler<handlers::GetLocalToGlobalTransformHandler>();
server_builder.RegisterHandler<handlers::GetConstraintsHandler>();
server_builder.RegisterHandler<handlers::IsTrajectoryFinishedHandler>();
server_builder.RegisterHandler<handlers::IsTrajectoryFrozenHandler>();
server_builder.RegisterHandler<handlers::LoadStateHandler>();
server_builder.RegisterHandler<handlers::LoadStateFromFileHandler>();
server_builder.RegisterHandler<handlers::RunFinalOptimizationHandler>();
server_builder.RegisterHandler<handlers::WriteStateHandler>();
server_builder.RegisterHandler<handlers::WriteStateToFileHandler>();
server_builder.RegisterHandler<handlers::SetLandmarkPoseHandler>();
grpc_server_ = server_builder.Build();
if (map_builder_server_options.map_builder_options()
.use_trajectory_builder_2d()) {
grpc_server_->SetExecutionContext(
absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
} else if (map_builder_server_options.map_builder_options()
.use_trajectory_builder_3d()) {
grpc_server_->SetExecutionContext(
absl::make_unique<MapBuilderContext<mapping::Submap3D>>(this));
} else {
LOG(FATAL)
<< "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
}
map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
[this](const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
OnGlobalSlamOptimizations(last_optimized_submap_ids,
last_optimized_node_ids);
});
}
构造函数很长,主要是设置server_builder,有一句很重要
grpc_server_->SetExecutionContext(
absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
这句话把直接构造MapBuilderContext后传递给了grpc_server_。this是MapBuilderServer,它把自己传递给了MapBuilderContext。
我们举例MapBuilderContext是怎样获取cartographer的数据,并在grpc中传递的。
构造函数,很简单,主要赋值给map_builder_server_,后面的很多函数就是直接从map_builder_server_拿数据,为什么?因为map_builder_server_有MapBuilder啊,手握指挥家,想要啥来啥。
template <class SubmapType>
MapBuilderContext<SubmapType>::MapBuilderContext(
MapBuilderServer* map_builder_server)
: map_builder_server_(map_builder_server) {}
template <class SubmapType>
void MapBuilderContext<SubmapType>::AddSensorDataToTrajectory(
const Data& sensor_data) {
sensor_data.data->AddToTrajectoryBuilder(
map_builder_server_->map_builder_->GetTrajectoryBuilder(
sensor_data.trajectory_id));
}
插一句:
这里我们发现一个奇怪的代码
template <>
void MapBuilderContext<mapping::Submap2D>::EnqueueLocalSlamResultData(
int trajectory_id, const std::string& sensor_id,
const mapping::proto::LocalSlamResultData& local_slam_result_data);
template <>
void MapBuilderContext<mapping::Submap3D>::EnqueueLocalSlamResultData(
int trajectory_id, const std::string& sensor_id,
const mapping::proto::LocalSlamResultData& local_slam_result_data);
它的具体实现是在这里
template <>
void MapBuilderContext<mapping::Submap2D>::EnqueueLocalSlamResultData(
int trajectory_id, const std::string& sensor_id,
const mapping::proto::LocalSlamResultData& local_slam_result_data) {
map_builder_server_->incoming_data_queue_.Push(absl::make_unique<Data>(
Data{trajectory_id,
absl::make_unique<mapping::LocalSlamResult2D>(
sensor_id, local_slam_result_data, &submap_controller_)}));
}
template <>
void MapBuilderContext<mapping::Submap3D>::EnqueueLocalSlamResultData(
int trajectory_id, const std::string& sensor_id,
const mapping::proto::LocalSlamResultData& local_slam_result_data) {
map_builder_server_->incoming_data_queue_.Push(absl::make_unique<Data>(
Data{trajectory_id,
absl::make_unique<mapping::LocalSlamResult3D>(
sensor_id, local_slam_result_data, &submap_controller_)}));
}
顺藤摸瓜,我们可以找到它是输入这种数据kLocalSlamResultData,,我们全局搜索,发现了这个类LocalTrajectoryUploader,这个类是干什么用的呢?
我们看它自己的解释
Uploads sensor data batches to uplink server. Gracefully handles interruptions of the connection.上传传感器数据集合到上行链路服务器。优雅地处理连接中断。
If there is an uplink server and a submap insertion happened, enqueue this local SLAM result for uploading.如果有一个上行链路服务器并且发生了submap插入,那么将这个本地SLAM结果排队以供上传。
应该是处理异常链接中断,通过其他服务器恢复数据用的。