Android P update_engine分析(七)--升级核心DeltaPerformer的分析

云鸿祯
2023-12-01

前面几篇分析了update_engine的启动update_engine_client的使用boot_control的AB切换,以及update_engine的4个Action,但始终没有看到核心的部分,如何将payload.bin如何下载到目标分区的。之前一直以为是在PostinstallRunnerAction的做的,后面发现错了,升级包里根本没有Postinstall脚本。重新分析之后发现,核心的升级操作都在DownloadAction 里做的,从而牵扯出一个核心的升级类DeltaPerformer。那我们还是从我们之前分析漏了的DownloadAction内容里开始。

Writer的初始化和使用

在DownloadAction的StartDonwloading中,已经初始化了Delta_performer_和wrter_。

void DownloadAction::StartDownloading() {
...
  if (writer_ && writer_ != delta_performer_.get()) {
    LOG(INFO) << "Using writer for test.";
  } else {
    //初始化delta_performer_ 和 writer_
    delta_performer_.reset(new DeltaPerformer(prefs_,
                                              boot_control_,
                                              hardware_,
                                              delegate_,
                                              &install_plan_,
                                              payload_,
                                              is_interactive_));
    writer_ = delta_performer_.get();
  }
  ...
   //开始下载
   http_fetcher_->BeginTransfer(install_plan_.download_url);
}

void UpdateAttempterAndroid::BuildUpdateActions(const string& url) {
...
//初始化HtttpFetcher,使用FileFetcher 还是LibcurlHttpFetcher
  HttpFetcher* download_fetcher = nullptr;
  if (FileFetcher::SupportedUrl(url)) {
    DLOG(INFO) << "Using FileFetcher for file URL.";
    download_fetcher = new FileFetcher();
  } else {
#ifdef _UE_SIDELOAD
    LOG(FATAL) << "Unsupported sideload URI: " << url;
#else
    LibcurlHttpFetcher* libcurl_fetcher =
        new LibcurlHttpFetcher(&proxy_resolver_, hardware_);
    libcurl_fetcher->set_server_to_check(ServerToCheck::kDownload);
    download_fetcher = libcurl_fetcher;
#endif  // _UE_SIDELOAD
  }
  shared_ptr<DownloadAction> download_action(
      new DownloadAction(prefs_,
                         boot_control_,
                         hardware_,
                         nullptr,           // system_state, not used.
                         download_fetcher,  // passes ownership
                         true /* is_interactive */));
 ...
}
bool FileFetcher::SupportedUrl(const string& url) {
  // check url是不是以file:///开头
  return base::StartsWith(
      url, "file:///", base::CompareCase::INSENSITIVE_ASCII);
}

因为我们升级更多的是将升级包下载后,使用U盘或SD卡在安装,调用的方式一般使用file:///开头的,所以download_fetcher使用的是FileFetcher类。同步download_action的初始化也是用FileFetcher初始化,那我们继续分析FileFetcher.BeginTransfer(…)。

FileFetcher传输


// Begins the transfer, which must not have already been started.
void FileFetcher::BeginTransfer(const string& url) {
  CHECK(!transfer_in_progress_);
 //再次确认是不是以file:///开头的
  if (!SupportedUrl(url)) {
    LOG(ERROR) << "Unsupported file URL: " << url;
    // No HTTP error code when the URL is not supported.
    http_response_code_ = 0;
    CleanUp();
    if (delegate_)
      delegate_->TransferComplete(this, false);
    return;
  }
//从指定路径使用文件流以只读的方式打开
  string file_path = url.substr(strlen("file://"));
  stream_ =
      brillo::FileStream::Open(base::FilePath(file_path),
                               brillo::Stream::AccessMode::READ,
                               brillo::FileStream::Disposition::OPEN_EXISTING,
                               nullptr);

  if (!stream_) {
    LOG(ERROR) << "Couldn't open " << file_path;
    http_response_code_ = kHttpResponseNotFound;
    CleanUp();
    if (delegate_)
      delegate_->TransferComplete(this, false);
    return;
  }
  http_response_code_ = kHttpResponseOk;
//如果有设置offset,就从指定offset开始
  if (offset_)
    stream_->SetPosition(offset_, nullptr);
  bytes_copied_ = 0;
  transfer_in_progress_ = true;
  ScheduleRead();
}

void FileFetcher::ScheduleRead() {
//检查状态
  if (transfer_paused_ || ongoing_read_ || !transfer_in_progress_)
    return;
//设置此次buffer读取的大小,默认为16K
  buffer_.resize(kReadBufferSize);
  size_t bytes_to_read = buffer_.size();
  if (data_length_ >= 0) {
    bytes_to_read = std::min(static_cast<uint64_t>(bytes_to_read),
                             data_length_ - bytes_copied_);
  }
//检查bytes_to_read是否合法
  if (!bytes_to_read) {
    OnReadDoneCallback(0);
    return;
  }
//从文件流中开始读取bytes_to_read字节,同时设置读取完成回调和读取出错回调
  ongoing_read_ = stream_->ReadAsync(
      buffer_.data(),
      bytes_to_read,
      base::Bind(&FileFetcher::OnReadDoneCallback, base::Unretained(this)),
      base::Bind(&FileFetcher::OnReadErrorCallback, base::Unretained(this)),
      nullptr);

  if (!ongoing_read_) {
    LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
    CleanUp();
    if (delegate_)
      delegate_->TransferComplete(this, false);
  }
}


使用文件流的方式打开升级包文件,使用buffer作为下载的接收缓存,然后开始下载,每次默认下载16K,下载完成之后就会回调onReadDoneCallback。


void FileFetcher::OnReadDoneCallback(size_t bytes_read) {
  ongoing_read_ = false;
  //如果读取数据为0,清除下载现场,同时完成传输
  if (bytes_read == 0) {
    CleanUp();
    if (delegate_)
      delegate_->TransferComplete(this, true);
  } else {
  //如果有设置delegate回调,就使用回调,调用ReceivedBytes处理buffer数据
    bytes_copied_ += bytes_read;
    if (delegate_)
      delegate_->ReceivedBytes(this, buffer_.data(), bytes_read);
    ScheduleRead();
  }
}
void DownloadAction::PerformAction() {
  http_fetcher_->set_delegate(this);
  ...
}

在DownloadAction的PerformAction中,已经设置了delegate回调,将DownloadAction对象本身做为回调delegate设置给FileFetcher。上面的ReceivedBytes调用就是调用DownloadAction的ReceivedBytes


void DownloadAction::ReceivedBytes(HttpFetcher* fetcher,
                                   const void* bytes,
                                   size_t length) {
  // Note that bytes_received_ is the current offset.
  if (!p2p_file_id_.empty()) {
    WriteToP2PFile(bytes, length, bytes_received_);
  }

  bytes_received_ += length;
  uint64_t bytes_downloaded_total =
      bytes_received_previous_payloads_ + bytes_received_;
  //updateAttermpterAndroid也设置了downloadAction的delegate_,同时也会回调updateAttermpterAndroid的BytesReceived
  if (delegate_ && download_active_) {
    delegate_->BytesReceived(length, bytes_downloaded_total, bytes_total_);
  }
  //调用DeltaPerformer的Write,这个是重点
  if (writer_ && !writer_->Write(bytes, length, &code_)) {
    if (code_ != ErrorCode::kSuccess) {
      LOG(ERROR) << "Error " << utils::ErrorCodeToString(code_) << " (" << code_
                 << ") in DeltaPerformer's Write method when "
                 << "processing the received payload -- Terminating processing";
    }
  ....
}
//DownloadAction 回调UpdateAttermpterAndroid的BytesReceived,上报进度
void UpdateAttempterAndroid::BytesReceived(uint64_t bytes_progressed,
                                           uint64_t bytes_received,
                                           uint64_t total) {
  double progress = 0;
  if (total)
    progress = static_cast<double>(bytes_received) / static_cast<double>(total);
  if (status_ != UpdateStatus::DOWNLOADING || bytes_received == total) {
    download_progress_ = progress;
    SetStatusAndNotify(UpdateStatus::DOWNLOADING);
  } else {
    //更新进度,同时将进度上报给上层调用者
    ProgressUpdate(progress);
  }

  // 更新进度到sharePerferences的文件中
  int64_t current_bytes_downloaded =
      metrics_utils::GetPersistedValue(kPrefsCurrentBytesDownloaded, prefs_);
  int64_t total_bytes_downloaded =
      metrics_utils::GetPersistedValue(kPrefsTotalBytesDownloaded, prefs_);
  prefs_->SetInt64(kPrefsCurrentBytesDownloaded,
                   current_bytes_downloaded + bytes_progressed);
  prefs_->SetInt64(kPrefsTotalBytesDownloaded,
                   total_bytes_downloaded + bytes_progressed);
}

DownloadAction的BytesReceived 一方面回调给UpdateAttempterAndroid的BytesReceived,将升级进度上报给上层调用者,同时将数据传给DeltaPerformer的write。 那这个函数是不是实际的升级动作呢,我们继续往下分析:


// Wrapper around write. Returns true if all requested bytes
// were written, or false on any error, regardless of progress
// and stores an action exit code in |error|.
bool DeltaPerformer::Write(const void* bytes, size_t count, ErrorCode *error) {
  *error = ErrorCode::kSuccess;
  const char* c_bytes = reinterpret_cast<const char*>(bytes);

  //更新下载的总数
  total_bytes_received_ += count;
  UpdateOverallProgress(false, "Completed ");

  //当manifest_vaild 为false,即无效时,如果manifest和metadata 还没有下载,就先下载manifest和metadata数据
  while (!manifest_valid_) {
    // Read data up to the needed limit; this is either maximium payload header
    // size, or the full metadata size (once it becomes known).
    const bool do_read_header = !IsHeaderParsed();
    //拷贝数据到Buffer中去
    CopyDataToBuffer(&c_bytes, &count,
                     (do_read_header ? kMaxPayloadHeaderSize :
                      metadata_size_ + metadata_signature_size_));
   //解析metadata,并并检查是否有错
    MetadataParseResult result = ParsePayloadMetadata(buffer_, error);
    if (result == MetadataParseResult::kError)
      return false;
    if (result == MetadataParseResult::kInsufficientData) {
      // If we just processed the header, make an attempt on the manifest.
      if (do_read_header && IsHeaderParsed())
        continue;

      return true;
    }

    // 检查Manifest是否有效
    if ((*error = ValidateManifest()) != ErrorCode::kSuccess)
      return false;
    manifest_valid_ = true;

    //清除buffer数据
    DiscardBuffer(false, metadata_size_);

    // This populates |partitions_| and the |install_plan.partitions| with the
    // list of partitions from the manifest.
    if (!ParseManifestPartitions(error))
      return false;

    // |install_plan.partitions| was filled in, nothing need to be done here if
    // the payload was already applied, returns false to terminate http fetcher,
    // but keep |error| as ErrorCode::kSuccess.
    if (payload_->already_applied)
      return false;

    num_total_operations_ = 0;
    for (const auto& partition : partitions_) {
      num_total_operations_ += partition.operations_size();
      acc_num_operations_.push_back(num_total_operations_);
    }

    LOG_IF(WARNING, !prefs_->SetInt64(kPrefsManifestMetadataSize,
                                      metadata_size_))
        << "Unable to save the manifest metadata size.";
    LOG_IF(WARNING, !prefs_->SetInt64(kPrefsManifestSignatureSize,
                                      metadata_signature_size_))
        << "Unable to save the manifest signature size.";

    if (!PrimeUpdateState()) {
      *error = ErrorCode::kDownloadStateInitializationError;
      LOG(ERROR) << "Unable to prime the update state.";
      return false;
    }

    if (!OpenCurrentPartition()) {
      *error = ErrorCode::kInstallDeviceOpenError;
      return false;
    }

    if (next_operation_num_ > 0)
      UpdateOverallProgress(true, "Resuming after ");
    LOG(INFO) << "Starting to apply update payload operations";
  }
//开始真实下载分区里的数据
  while (next_operation_num_ < num_total_operations_) {
    // 检查是否应该退出下载
    if (download_delegate_ && download_delegate_->ShouldCancel(error))
      return false;

    //检查是否超过当前分区大小限制.
    while (next_operation_num_ >= acc_num_operations_[current_partition_]) {
      CloseCurrentPartition();
      current_partition_++;
      if (!OpenCurrentPartition()) {
        *error = ErrorCode::kInstallDeviceOpenError;
        return false;
      }
    }
    //计算当前需要操作的分区的块数
    const size_t partition_operation_num = next_operation_num_ - (
        current_partition_ ? acc_num_operations_[current_partition_ - 1] : 0);
    //将当前需要操作的分区块的参数封装到installOperation中
    const InstallOperation& op =
        partitions_[current_partition_].operations(partition_operation_num);
    //将需要下载的数据拷贝到buffet_中
    CopyDataToBuffer(&c_bytes, &count, op.data_length());

    // 检查是否接收到了下次操作的全部数据
    if (!CanPerformInstallOperation(op))
      return true;

    // 检查metadata的签名是否为空,如果是空,则无效
    if (!payload_->metadata_signature.empty()) {
      // Note: Validate must be called only if CanPerformInstallOperation is
      // called. Otherwise, we might be failing operations before even if there
      // isn't sufficient data to compute the proper hash.
      *error = ValidateOperationHash(op);
      if (*error != ErrorCode::kSuccess) {
        if (install_plan_->hash_checks_mandatory) {
          LOG(ERROR) << "Mandatory operation hash check failed";
          return false;
        }

        // For non-mandatory cases, just send a UMA stat.
        LOG(WARNING) << "Ignoring operation validation errors";
        *error = ErrorCode::kSuccess;
      }
    }

    // Makes sure we unblock exit when this operation completes.
    ScopedTerminatorExitUnblocker exit_unblocker =
        ScopedTerminatorExitUnblocker();  // Avoids a compiler unused var bug.

    base::TimeTicks op_start_time = base::TimeTicks::Now();

    bool op_result;
    //根据操作的不同类型做不同的操作处理,我们使用比较多得多的是升级替代
    switch (op.type()) {
      case InstallOperation::REPLACE:
      case InstallOperation::REPLACE_BZ:
      case InstallOperation::REPLACE_XZ:
       //执行替代的相关操作
        op_result = PerformReplaceOperation(op);
        OP_DURATION_HISTOGRAM("REPLACE", op_start_time);
        break;
      case InstallOperation::ZERO:
      case InstallOperation::DISCARD:
        op_result = PerformZeroOrDiscardOperation(op);
        OP_DURATION_HISTOGRAM("ZERO_OR_DISCARD", op_start_time);
        break;
      case InstallOperation::MOVE:
        op_result = PerformMoveOperation(op);
        OP_DURATION_HISTOGRAM("MOVE", op_start_time);
        break;
      case InstallOperation::BSDIFF:
        op_result = PerformBsdiffOperation(op);
        OP_DURATION_HISTOGRAM("BSDIFF", op_start_time);
        break;
      case InstallOperation::SOURCE_COPY:
        op_result = PerformSourceCopyOperation(op, error);
        OP_DURATION_HISTOGRAM("SOURCE_COPY", op_start_time);
        break;
      case InstallOperation::SOURCE_BSDIFF:
      case InstallOperation::BROTLI_BSDIFF:
        op_result = PerformSourceBsdiffOperation(op, error);
        OP_DURATION_HISTOGRAM("SOURCE_BSDIFF", op_start_time);
        break;
      case InstallOperation::PUFFDIFF:
        op_result = PerformPuffDiffOperation(op, error);
        OP_DURATION_HISTOGRAM("PUFFDIFF", op_start_time);
        break;
      default:
        op_result = false;
    }
    if (!HandleOpResult(op_result, InstallOperationTypeName(op.type()), error))
      return false;

    if (!target_fd_->Flush()) {
      return false;
    }

    next_operation_num_++;
    UpdateOverallProgress(false, "Completed ");
    CheckpointUpdateProgress();
  }

  //如果是版本二,同时已经有签名了,签名信息为空,下载数据为medadata的签名数据
  if (major_payload_version_ == kBrilloMajorPayloadVersion &&
      manifest_.has_signatures_offset() && manifest_.has_signatures_size() &&
      signatures_message_data_.empty()) {
    if (manifest_.signatures_offset() != buffer_offset_) {
      LOG(ERROR) << "Payload signatures offset points to blob offset "
                 << manifest_.signatures_offset()
                 << " but signatures are expected at offset "
                 << buffer_offset_;
      *error = ErrorCode::kDownloadPayloadVerificationError;
      return false;
    }
    CopyDataToBuffer(&c_bytes, &count, manifest_.signatures_size());
    // Needs more data to cover entire signature.
    if (buffer_.size() < manifest_.signatures_size())
      return true;
    if (!ExtractSignatureMessage()) {
      LOG(ERROR) << "Extract payload signature failed.";
      *error = ErrorCode::kDownloadPayloadVerificationError;
      return false;
    }
    DiscardBuffer(true, 0);
    // Since we extracted the SignatureMessage we need to advance the
    // checkpoint, otherwise we would reload the signature and try to extract
    // it again.
    CheckpointUpdateProgress();
  }

  return true;
}

Write 函数很关键,首先分清楚是metadata的数据,下载到分区的数据,还是metadata签名数据。我们主要关注要下载到分区的数据,再根据installOperation的tyte类型,做对应的操作。我们先来看REPLACE的对应操作:


bool DeltaPerformer::PerformReplaceOperation(
    const InstallOperation& operation) {
  CHECK(operation.type() == InstallOperation::REPLACE ||
        operation.type() == InstallOperation::REPLACE_BZ ||
        operation.type() == InstallOperation::REPLACE_XZ);

  // Since we delete data off the beginning of the buffer as we use it,
  // the data we need should be exactly at the beginning of the buffer.
  TEST_AND_RETURN_FALSE(buffer_offset_ == operation.data_offset());
  TEST_AND_RETURN_FALSE(buffer_.size() >= operation.data_length());

  // 检查下提取签名信息是否在这个操作中
  if (ExtractSignatureMessageFromOperation(operation)) {
    // If this is dummy replace operation, we ignore it after extracting the
    // signature.
    DiscardBuffer(true, 0);
    return true;
  }

  //初始化ExtentWriter
  std::unique_ptr<ExtentWriter> writer = std::make_unique<ZeroPadExtentWriter>(
      std::make_unique<DirectExtentWriter>());
 //重置wirter
  if (operation.type() == InstallOperation::REPLACE_BZ) {
    writer.reset(new BzipExtentWriter(std::move(writer)));
  } else if (operation.type() == InstallOperation::REPLACE_XZ) {
    writer.reset(new XzExtentWriter(std::move(writer)));
  }
//初始化writer, 指定写入的目标
  TEST_AND_RETURN_FALSE(
      writer->Init(target_fd_, operation.dst_extents(), block_size_));
//将之前拷贝到buffer的数据,写入到目标区域
  TEST_AND_RETURN_FALSE(writer->Write(buffer_.data(), operation.data_length()));
  TEST_AND_RETURN_FALSE(writer->End());

  //重置buffer数据
  DiscardBuffer(true, buffer_.size());
  return true;
}

终于看到下载的数据实际要写入目标区域的操作了,其他几种type也是类似的,包括差分升级,原始拷贝,移动等,都是在这里处理的。
那到这里,我们的AB升级分区就分析完了。

 类似资料: