当前位置: 首页 > 工具软件 > Mapbuilder > 使用案例 >

MapBuilder grpc内容传输接口MapBuilderContext

毕嘉
2023-12-01

grpc client和sever之间的数据传输容器,可以在多个rpc函数中共享数据
先跟进MapBuilderContext类

template <class SubmapType>
class MapBuilderContext : public MapBuilderContextInterface {
 public:
  MapBuilderContext(MapBuilderServer* map_builder_server);
  mapping::MapBuilderInterface& map_builder() override;
  common::BlockingQueue<std::unique_ptr<MapBuilderContextInterface::Data>>&
  sensor_data_queue() override;
  mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
  GetLocalSlamResultCallbackForSubscriptions() override;
  void AddSensorDataToTrajectory(const Data& sensor_data) override;
  MapBuilderContextInterface::LocalSlamSubscriptionId SubscribeLocalSlamResults(
      int trajectory_id, LocalSlamSubscriptionCallback callback) override;
  void UnsubscribeLocalSlamResults(
      const LocalSlamSubscriptionId& subscription_id) override;
  int SubscribeGlobalSlamOptimizations(
      GlobalSlamOptimizationCallback callback) override;
  void UnsubscribeGlobalSlamOptimizations(int subscription_index) override;
  void NotifyFinishTrajectory(int trajectory_id) override;
  LocalTrajectoryUploaderInterface* local_trajectory_uploader() override;
  void EnqueueSensorData(int trajectory_id,
                         std::unique_ptr<sensor::Data> data) override;
  void EnqueueLocalSlamResultData(int trajectory_id,
                                  const std::string& sensor_id,
                                  const mapping::proto::LocalSlamResultData&
                                      local_slam_result_data) override;
  void RegisterClientIdForTrajectory(const std::string& client_id,
                                     int trajectory_id) override;
  bool CheckClientIdForTrajectory(const std::string& client_id,
                                  int trajectory_id) override;

 private:
  MapBuilderServer* map_builder_server_;
  mapping::SubmapController<SubmapType> submap_controller_;
  std::map</*trajectory_id=*/int, /*client_id=*/std::string> client_ids_;
};

再跟进MapBuilderContextInterface

class MapBuilderContextInterface : public async_grpc::ExecutionContext {
 public:
  struct LocalSlamResult {
    int trajectory_id;
    common::Time time;
    transform::Rigid3d local_pose;
    std::shared_ptr<const sensor::RangeData> range_data;
    std::unique_ptr<const mapping::TrajectoryBuilderInterface::InsertionResult>
        insertion_result;
  };
  // Calling with 'nullptr' signals subscribers that the subscription has ended,
  // e.g. this happens when the corresponding trajectory was finished and hence
  // no more local SLAM updates will occur.
  // The callback can return 'false' to indicate that the client is not
  // interested in more local SLAM updates and 'MapBuilderServer' will end the
  // subscription.
  using LocalSlamSubscriptionCallback =
      std::function<bool(std::unique_ptr<LocalSlamResult>)>;

  // The callback can return 'false' to indicate that the client is not
  // interested in more global SLAM runs and 'MapBuilderServer' will end the
  // subscription.
  using GlobalSlamOptimizationCallback = std::function<bool(
      const std::map<int /* trajectory_id */, mapping::SubmapId>&,
      const std::map<int /* trajectory_id */, mapping::NodeId>&)>;

  struct Data {
    int trajectory_id;
    std::unique_ptr<sensor::Data> data;
  };
  struct LocalSlamSubscriptionId {
    const int trajectory_id;
    const int subscription_index;
  };

  MapBuilderContextInterface() = default;
  ~MapBuilderContextInterface() = default;

  MapBuilderContextInterface(const MapBuilderContextInterface&) = delete;
  MapBuilderContextInterface& operator=(const MapBuilderContextInterface&) =
      delete;

  virtual mapping::MapBuilderInterface& map_builder() = 0;
  virtual common::BlockingQueue<std::unique_ptr<Data>>& sensor_data_queue() = 0;
  virtual mapping::TrajectoryBuilderInterface::LocalSlamResultCallback
  GetLocalSlamResultCallbackForSubscriptions() = 0;
  virtual void AddSensorDataToTrajectory(const Data& sensor_data) = 0;
  virtual LocalSlamSubscriptionId SubscribeLocalSlamResults(
      int trajectory_id, LocalSlamSubscriptionCallback callback) = 0;
  virtual void UnsubscribeLocalSlamResults(
      const LocalSlamSubscriptionId& subscription_id) = 0;
  virtual int SubscribeGlobalSlamOptimizations(
      GlobalSlamOptimizationCallback callback) = 0;
  virtual void UnsubscribeGlobalSlamOptimizations(int subscription_index) = 0;
  virtual void NotifyFinishTrajectory(int trajectory_id) = 0;
  virtual LocalTrajectoryUploaderInterface* local_trajectory_uploader() = 0;
  virtual void EnqueueSensorData(int trajectory_id,
                                 std::unique_ptr<sensor::Data> data) = 0;
  virtual void EnqueueLocalSlamResultData(
      int trajectory_id, const std::string& sensor_id,
      const mapping::proto::LocalSlamResultData& local_slam_result_data) = 0;
  virtual void RegisterClientIdForTrajectory(const std::string& client_id,
                                             int trajectory_id) = 0;
  virtual bool CheckClientIdForTrajectory(const std::string& client_id,
                                          int trajectory_id) = 0;
};

再跟进ExecutionContext

class ExecutionContext {
 public:
  // Automatically locks an ExecutionContext for shared use by RPC handlers.
  // This non-movable, non-copyable class is used to broker access from various
  // RPC handlers to the shared 'ExecutionContext'.
  template <typename ContextType>
  class Synchronized {
   public:
    ContextType* operator->() {
      return static_cast<ContextType*>(execution_context_);
    }
    Synchronized(common::Mutex* lock, ExecutionContext* execution_context)
        : locker_(lock), execution_context_(execution_context) {}
    Synchronized(const Synchronized&) = delete;
    Synchronized(Synchronized&&) = delete;

   private:
    common::MutexLocker locker_;
    ExecutionContext* execution_context_;
  };
  ExecutionContext() = default;
  virtual ~ExecutionContext() = default;
  ExecutionContext(const ExecutionContext&) = delete;
  ExecutionContext& operator=(const ExecutionContext&) = delete;
  common::Mutex* lock() { return &lock_; }

 private:
  common::Mutex lock_;
};

这么一长串想要干嘛?我们之前说过了MapBuilderContext是在各个rpc函数中共享数据,那这些数据是从哪拿来的呢?我们一步一步看

首先是map_builder_server_main.cc,在Run函数中构造了MapBuilder,我们之前说这个是cartographer表演的指挥家,但是这里它只是MapBuilderServer的一个成员变量,可见MapBuilderServer才是真正的幕后大佬。

void Run(const std::string& configuration_directory,
         const std::string& configuration_basename) {
#if USE_PROMETHEUS
  metrics::prometheus::FamilyFactory registry;
  ::cartographer::metrics::RegisterAllMetrics(&registry);
  RegisterMapBuilderServerMetrics(&registry);
  ::prometheus::Exposer exposer("0.0.0.0:9100");
  exposer.RegisterCollectable(registry.GetCollectable());
  LOG(INFO) << "Exposing metrics at http://localhost:9100/metrics";
#endif

  proto::MapBuilderServerOptions map_builder_server_options =
      LoadMapBuilderServerOptions(configuration_directory,
                                  configuration_basename);
  auto map_builder = absl::make_unique<mapping::MapBuilder>(
      map_builder_server_options.map_builder_options());
  std::unique_ptr<MapBuilderServerInterface> map_builder_server =
      CreateMapBuilderServer(map_builder_server_options,
                             std::move(map_builder));
  map_builder_server->Start();
  map_builder_server->WaitForShutdown();
}

我们接着看MapBuilderServer的构造函数

MapBuilderServer::MapBuilderServer(
    const proto::MapBuilderServerOptions& map_builder_server_options,
    std::unique_ptr<mapping::MapBuilderInterface> map_builder)
    : map_builder_(std::move(map_builder)) {
  async_grpc::Server::Builder server_builder;
  server_builder.SetServerAddress(map_builder_server_options.server_address());
  server_builder.SetNumGrpcThreads(
      map_builder_server_options.num_grpc_threads());
  server_builder.SetNumEventThreads(
      map_builder_server_options.num_event_threads());
  server_builder.SetMaxSendMessageSize(kMaxMessageSize);
  server_builder.SetMaxReceiveMessageSize(kMaxMessageSize);
  if (!map_builder_server_options.uplink_server_address().empty()) {
    local_trajectory_uploader_ = CreateLocalTrajectoryUploader(
        map_builder_server_options.uplink_server_address(),
        map_builder_server_options.upload_batch_size(),
        map_builder_server_options.enable_ssl_encryption(),
        map_builder_server_options.enable_google_auth());
  }
  server_builder.RegisterHandler<handlers::AddTrajectoryHandler>();
  server_builder.RegisterHandler<handlers::AddOdometryDataHandler>();
  server_builder.RegisterHandler<handlers::AddImuDataHandler>();
  server_builder.RegisterHandler<handlers::AddRangefinderDataHandler>();
  server_builder.RegisterHandler<handlers::AddFixedFramePoseDataHandler>();
  server_builder.RegisterHandler<handlers::AddLandmarkDataHandler>();
  server_builder.RegisterHandler<handlers::AddSensorDataBatchHandler>();
  server_builder.RegisterHandler<handlers::FinishTrajectoryHandler>();
  server_builder.RegisterHandler<handlers::DeleteTrajectoryHandler>();
  server_builder.RegisterHandler<handlers::ReceiveGlobalSlamOptimizationsHandler>();
  server_builder.RegisterHandler<handlers::ReceiveLocalSlamResultsHandler>();
  server_builder.RegisterHandler<handlers::GetSubmapHandler>();
  server_builder.RegisterHandler<handlers::GetTrajectoryNodePosesHandler>();
  server_builder.RegisterHandler<handlers::GetTrajectoryStatesHandler>();
  server_builder.RegisterHandler<handlers::GetLandmarkPosesHandler>();
  server_builder.RegisterHandler<handlers::GetAllSubmapPosesHandler>();
  server_builder.RegisterHandler<handlers::GetLocalToGlobalTransformHandler>();
  server_builder.RegisterHandler<handlers::GetConstraintsHandler>();
  server_builder.RegisterHandler<handlers::IsTrajectoryFinishedHandler>();
  server_builder.RegisterHandler<handlers::IsTrajectoryFrozenHandler>();
  server_builder.RegisterHandler<handlers::LoadStateHandler>();
  server_builder.RegisterHandler<handlers::LoadStateFromFileHandler>();
  server_builder.RegisterHandler<handlers::RunFinalOptimizationHandler>();
  server_builder.RegisterHandler<handlers::WriteStateHandler>();
  server_builder.RegisterHandler<handlers::WriteStateToFileHandler>();
  server_builder.RegisterHandler<handlers::SetLandmarkPoseHandler>();
  grpc_server_ = server_builder.Build();
  if (map_builder_server_options.map_builder_options()
          .use_trajectory_builder_2d()) {
    grpc_server_->SetExecutionContext(
        absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));
  } else if (map_builder_server_options.map_builder_options()
                 .use_trajectory_builder_3d()) {
    grpc_server_->SetExecutionContext(
        absl::make_unique<MapBuilderContext<mapping::Submap3D>>(this));
  } else {
    LOG(FATAL)
        << "Set either use_trajectory_builder_2d or use_trajectory_builder_3d";
  }
  map_builder_->pose_graph()->SetGlobalSlamOptimizationCallback(
      [this](const std::map<int, mapping::SubmapId>& last_optimized_submap_ids,
             const std::map<int, mapping::NodeId>& last_optimized_node_ids) {
        OnGlobalSlamOptimizations(last_optimized_submap_ids,
                                  last_optimized_node_ids);
      });
}

构造函数很长,主要是设置server_builder,有一句很重要

grpc_server_->SetExecutionContext(
        absl::make_unique<MapBuilderContext<mapping::Submap2D>>(this));

这句话把直接构造MapBuilderContext后传递给了grpc_server_。this是MapBuilderServer,它把自己传递给了MapBuilderContext。

我们举例MapBuilderContext是怎样获取cartographer的数据,并在grpc中传递的。

构造函数,很简单,主要赋值给map_builder_server_,后面的很多函数就是直接从map_builder_server_拿数据,为什么?因为map_builder_server_有MapBuilder啊,手握指挥家,想要啥来啥。

template <class SubmapType>
MapBuilderContext<SubmapType>::MapBuilderContext(
    MapBuilderServer* map_builder_server)
    : map_builder_server_(map_builder_server) {}
template <class SubmapType>
void MapBuilderContext<SubmapType>::AddSensorDataToTrajectory(
    const Data& sensor_data) {
  sensor_data.data->AddToTrajectoryBuilder(
      map_builder_server_->map_builder_->GetTrajectoryBuilder(
          sensor_data.trajectory_id));
}

插一句:
这里我们发现一个奇怪的代码

template <>
void MapBuilderContext<mapping::Submap2D>::EnqueueLocalSlamResultData(
    int trajectory_id, const std::string& sensor_id,
    const mapping::proto::LocalSlamResultData& local_slam_result_data);
template <>
void MapBuilderContext<mapping::Submap3D>::EnqueueLocalSlamResultData(
    int trajectory_id, const std::string& sensor_id,
    const mapping::proto::LocalSlamResultData& local_slam_result_data);

它的具体实现是在这里

template <>
void MapBuilderContext<mapping::Submap2D>::EnqueueLocalSlamResultData(
    int trajectory_id, const std::string& sensor_id,
    const mapping::proto::LocalSlamResultData& local_slam_result_data) {
  map_builder_server_->incoming_data_queue_.Push(absl::make_unique<Data>(
      Data{trajectory_id,
           absl::make_unique<mapping::LocalSlamResult2D>(
               sensor_id, local_slam_result_data, &submap_controller_)}));
}

template <>
void MapBuilderContext<mapping::Submap3D>::EnqueueLocalSlamResultData(
    int trajectory_id, const std::string& sensor_id,
    const mapping::proto::LocalSlamResultData& local_slam_result_data) {
  map_builder_server_->incoming_data_queue_.Push(absl::make_unique<Data>(
      Data{trajectory_id,
           absl::make_unique<mapping::LocalSlamResult3D>(
               sensor_id, local_slam_result_data, &submap_controller_)}));
}

顺藤摸瓜,我们可以找到它是输入这种数据kLocalSlamResultData,,我们全局搜索,发现了这个类LocalTrajectoryUploader,这个类是干什么用的呢?
我们看它自己的解释
Uploads sensor data batches to uplink server. Gracefully handles interruptions of the connection.上传传感器数据集合到上行链路服务器。优雅地处理连接中断。
If there is an uplink server and a submap insertion happened, enqueue this local SLAM result for uploading.如果有一个上行链路服务器并且发生了submap插入,那么将这个本地SLAM结果排队以供上传。

应该是处理异常链接中断,通过其他服务器恢复数据用的。

 类似资料: