当前位置: 首页 > 工具软件 > Orc > 使用案例 >

ORC 之 C++ 之 Reader

卜瀚漠
2023-12-01

orc/c++/src/Reader.hh 是 orc/c++/include/orc/Reader.hh 的impl子类

RowReader

orc/c++/include/orc/Reader.hh

/**
   * The interface for reading rows in ORC files.
   * This is an an abstract class that will be subclassed as necessary.
   */
  class RowReader {
  public:
    virtual ~RowReader();
    /**
     * Get the selected type of the rows in the file. The file's row type
     * is projected down to just the selected columns. Thus, if the file's
     * type is struct<col0:int,col1:double,col2:string> and the selected
     * columns are "col0,col2" the selected type would be
     * struct<col0:int,col2:string>.
     * @return the root type
     */
    virtual const Type& getSelectedType() const = 0;

    /**
     * Get the selected columns of the file.
     */
    virtual const std::vector<bool> getSelectedColumns() const = 0;

    /**
     * Create a row batch for reading the selected columns of this file.
     * @param size the number of rows to read
     * @return a new ColumnVectorBatch to read into
     */
    virtual ORC_UNIQUE_PTR<ColumnVectorBatch> createRowBatch(uint64_t size
                                                             ) const = 0;

    /**
     * Read the next row batch from the current position.
     * Caller must look at numElements in the row batch to determine how
     * many rows were read.
     * @param data the row batch to read into.
     * @return true if a non-zero number of rows were read or false if the
     *   end of the file was reached.
     */
    virtual bool next(ColumnVectorBatch& data) = 0;

    /**
     * Get the row number of the first row in the previously read batch.
     * @return the row number of the previous batch.
     */
    virtual uint64_t getRowNumber() const = 0;

    /**
     * Seek to a given row.
     * @param rowNumber the next row the reader should return
     */
    virtual void seekToRow(uint64_t rowNumber) = 0;

  };
}

RowReaderImpl

orc/c++/src/Reader.hh

class RowReaderImpl : public RowReader {
  private:
    const Timezone& localTimezone;

    // contents
    std::shared_ptr<FileContents> contents;
    const bool throwOnHive11DecimalOverflow;
    const int32_t forcedScaleOnHive11Decimal;

    // inputs
    std::vector<bool> selectedColumns;

    // footer
    proto::Footer* footer;
    DataBuffer<uint64_t> firstRowOfStripe;
    mutable std::unique_ptr<Type> selectedSchema;

    // reading state
    uint64_t previousRow;
    uint64_t firstStripe;
    uint64_t currentStripe;
    uint64_t lastStripe; // the stripe AFTER the last one
    uint64_t currentRowInStripe;
    uint64_t rowsInCurrentStripe;
    proto::StripeInformation currentStripeInfo;
    proto::StripeFooter currentStripeFooter;
    std::unique_ptr<ColumnReader> reader;

    bool enableEncodedBlock;
    // internal methods
    void startNextStripe();

    // row index of current stripe with column id as the key
    std::unordered_map<uint64_t, proto::RowIndex> rowIndexes;

    /**
     * Seek to the start of a row group in the current stripe
     * @param rowGroupEntryId the row group id to seek to
     */
    void seekToRowGroup(uint32_t rowGroupEntryId);

  public:
   /**
    * Constructor that lets the user specify additional options.
    * @param contents of the file
    * @param options options for reading
    */
    RowReaderImpl(std::shared_ptr<FileContents> contents,
                  const RowReaderOptions& options);

    // Select the columns from the options object
    void updateSelected();
    const std::vector<bool> getSelectedColumns() const override;

    const Type& getSelectedType() const override;

    std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size
                                                      ) const override;

    bool next(ColumnVectorBatch& data) override;

    CompressionKind getCompression() const;

    uint64_t getCompressionSize() const;

    uint64_t getRowNumber() const override;

    void seekToRow(uint64_t rowNumber) override;

    const FileContents& getFileContents() const;
    bool getThrowOnHive11DecimalOverflow() const;
    int32_t getForcedScaleOnHive11Decimal() const;
  };

RowReaderImpl and RowReaderOptions

orc/c++/src/Reader.cc
RowReaderOptions 中的 opts.getOffset(), opts.getLength() ; 确定了需要next 的范围。

 RowReaderImpl::RowReaderImpl(std::shared_ptr<FileContents> _contents,
                               const RowReaderOptions& opts
                         ): localTimezone(getLocalTimezone()),
                            contents(_contents),
                            throwOnHive11DecimalOverflow(opts.getThrowOnHive11DecimalOverflow()),
                            forcedScaleOnHive11Decimal(opts.getForcedScaleOnHive11Decimal()),
                            footer(contents->footer.get()),
                            firstRowOfStripe(*contents->pool, 0),
                            enableEncodedBlock(opts.getEnableLazyDecoding()) {
    uint64_t numberOfStripes;
    numberOfStripes = static_cast<uint64_t>(footer->stripes_size());
    currentStripe = numberOfStripes;
    lastStripe = 0;
    currentRowInStripe = 0;
    rowsInCurrentStripe = 0;
    uint64_t rowTotal = 0;

    firstRowOfStripe.resize(numberOfStripes);
    for(size_t i=0; i < numberOfStripes; ++i) {
      firstRowOfStripe[i] = rowTotal;
      proto::StripeInformation stripeInfo =
        footer->stripes(static_cast<int>(i));
      rowTotal += stripeInfo.numberofrows();
      bool isStripeInRange = stripeInfo.offset() >= opts.getOffset() &&
        stripeInfo.offset() < opts.getOffset() + opts.getLength();
      if (isStripeInRange) {
        if (i < currentStripe) {
          currentStripe = i;
        }
        if (i >= lastStripe) {
          lastStripe = i + 1;
        }
      }
    }
    firstStripe = currentStripe;

    if (currentStripe == 0) {
      previousRow = (std::numeric_limits<uint64_t>::max)();
    } else if (currentStripe == numberOfStripes) {
      previousRow = footer->numberofrows();
    } else {
      previousRow = firstRowOfStripe[firstStripe]-1;
    }

    ColumnSelector column_selector(contents.get());
    column_selector.updateSelected(selectedColumns, opts);
  }

Reader

orc/c++/include/orc/Reader.hh

/**
   * The interface for reading ORC file meta-data and constructing RowReaders.
   * This is an an abstract class that will be subclassed as necessary.
   */
  class Reader {
  public:
    virtual ~Reader();

    /**
     * Get the format version of the file. Currently known values are:
     * 0.11 and 0.12
     * @return the FileVersion object
     */
    virtual FileVersion getFormatVersion() const = 0;

    /**
     * Get the number of rows in the file.
     * @return the number of rows
     */
    virtual uint64_t getNumberOfRows() const = 0;

    /**
     * Get the user metadata keys.
     * @return the set of user metadata keys
     */
    virtual std::list<std::string> getMetadataKeys() const = 0;

    /**
     * Get a user metadata value.
     * @param key a key given by the user
     * @return the bytes associated with the given key
     */
    virtual std::string getMetadataValue(const std::string& key) const = 0;

    /**
     * Did the user set the given metadata value.
     * @param key the key to check
     * @return true if the metadata value was set
     */
    virtual bool hasMetadataValue(const std::string& key) const = 0;

    /**
     * Get the compression kind.
     * @return the kind of compression in the file
     */
    virtual CompressionKind getCompression() const = 0;

    /**
     * Get the buffer size for the compression.
     * @return number of bytes to buffer for the compression codec.
     */
    virtual uint64_t getCompressionSize() const = 0;

    /**
     * Get ID of writer that generated the file.
     * @return UNKNOWN_WRITER if the writer ID is undefined
     */
    virtual WriterId getWriterId() const = 0;

    /**
     * Get the writer id value when getWriterId() returns an unknown writer.
     * @return the integer value of the writer ID.
     */
    virtual uint32_t getWriterIdValue() const = 0;

    /**
     * Get the version of the writer.
     * @return the version of the writer.
     */
    virtual WriterVersion getWriterVersion() const = 0;

    /**
     * Get the number of rows per an entry in the row index.
     * @return the number of rows per an entry in the row index or 0 if there
     * is no row index.
     */
    virtual uint64_t getRowIndexStride() const = 0;

    /**
     * Get the number of stripes in the file.
     * @return the number of stripes
     */
    virtual uint64_t getNumberOfStripes() const = 0;

    /**
     * Get the information about a stripe.
     * @param stripeIndex the index of the stripe (0 to N-1) to get information about
     * @return the information about that stripe
     */
    virtual ORC_UNIQUE_PTR<StripeInformation>
    getStripe(uint64_t stripeIndex) const = 0;

    /**
     * Get the number of stripe statistics in the file.
     * @return the number of stripe statistics
     */
    virtual uint64_t getNumberOfStripeStatistics() const = 0;

    /**
     * Get the statistics about a stripe.
     * @param stripeIndex the index of the stripe (0 to N-1) to get statistics about
     * @return the statistics about that stripe
     */
    virtual ORC_UNIQUE_PTR<StripeStatistics>
    getStripeStatistics(uint64_t stripeIndex) const = 0;

    /**
     * Get the length of the data stripes in the file.
     * @return the number of bytes in stripes
     */
    virtual uint64_t getContentLength() const = 0;

    /**
     * Get the length of the file stripe statistics.
     * @return the number of compressed bytes in the file stripe statistics
     */
    virtual uint64_t getStripeStatisticsLength() const = 0;

    /**
     * Get the length of the file footer.
     * @return the number of compressed bytes in the file footer
     */
    virtual uint64_t getFileFooterLength() const = 0;

    /**
     * Get the length of the file postscript.
     * @return the number of bytes in the file postscript
     */
    virtual uint64_t getFilePostscriptLength() const = 0;

    /**
     * Get the total length of the file.
     * @return the number of bytes in the file
     */
    virtual uint64_t getFileLength() const = 0;

    /**
     * Get the statistics about the columns in the file.
     * @return the information about the column
     */
    virtual ORC_UNIQUE_PTR<Statistics> getStatistics() const = 0;

    /**
     * Get the statistics about a single column in the file.
     * @param columnId id of the column
     * @return the information about the column
     */
    virtual ORC_UNIQUE_PTR<ColumnStatistics>
    getColumnStatistics(uint32_t columnId) const = 0;

    /**
     * Check if the file has correct column statistics.
     */
    virtual bool hasCorrectStatistics() const = 0;

    /**
     * Get the serialized file tail.
     * Usefull if another reader of the same file wants to avoid re-reading
     * the file tail. See ReaderOptions.setSerializedFileTail().
     * @return a string of bytes with the file tail
     */
    virtual std::string getSerializedFileTail() const = 0;

    /**
     * Get the type of the rows in the file. The top level is typically a
     * struct.
     * @return the root type
     */
    virtual const Type& getType() const = 0;

    /**
     * Create a RowReader based on this reader with the default options.
     * @return a RowReader to read the rows
     */
    virtual ORC_UNIQUE_PTR<RowReader> createRowReader() const = 0;

    /**
     * Create a RowReader based on this reader.
     * @param options RowReader Options
     * @return a RowReader to read the rows
     */
    virtual ORC_UNIQUE_PTR<RowReader> createRowReader(const RowReaderOptions& options) const = 0;

    /**
     * Get the name of the input stream.
     */
    virtual const std::string& getStreamName() const = 0;

    /**
     * Estimate an upper bound on heap memory allocation by the Reader
     * based on the information in the file footer.
     * The bound is less tight if only few columns are read or compression is
     * used.
    */
    /**
     * @param stripeIx index of the stripe to be read (if not specified,
     *        all stripes are considered).
     * @return upper bound on memory use by all columns
     */
    virtual uint64_t getMemoryUse(int stripeIx=-1) = 0;

    /**
     * @param include Column Field Ids
     * @param stripeIx index of the stripe to be read (if not specified,
     *        all stripes are considered).
     * @return upper bound on memory use by selected columns
     */
    virtual uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx=-1) = 0;

    /**
     * @param names Column Names
     * @param stripeIx index of the stripe to be read (if not specified,
     *        all stripes are considered).
     * @return upper bound on memory use by selected columns
     */
    virtual uint64_t getMemoryUseByName(const std::list<std::string>& names, int stripeIx=-1) = 0;

    /**
     * @param include Column Type Ids
     * @param stripeIx index of the stripe to be read (if not specified,
     *        all stripes are considered).
     * @return upper bound on memory use by selected columns
     */
    virtual uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) = 0;

    /**
     * Get BloomFiters of all selected columns in the specified stripe
     * @param stripeIndex index of the stripe to be read for bloom filters.
     * @param included index of selected columns to return (if not specified,
     *        all columns that have bloom filters are considered).
     * @return map of bloom filters with the key standing for the index of column.
     */
    virtual std::map<uint32_t, BloomFilterIndex>
    getBloomFilters(uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;
  };

ReaderImpl

orc/c++/src/Reader.hh

 class ReaderImpl : public Reader {
   private:
    // FileContents
    std::shared_ptr<FileContents> contents;

    // inputs
    const ReaderOptions options;
    const uint64_t fileLength;
    const uint64_t postscriptLength;

    // footer
    proto::Footer* footer;
    uint64_t numberOfStripes;
    uint64_t getMemoryUse(int stripeIx, std::vector<bool>& selectedColumns);

    // internal methods
    void readMetadata() const;
    void checkOrcVersion();
    void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
                               const proto::StripeFooter& currentStripeFooter,
                               std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;

    // metadata
    mutable std::unique_ptr<proto::Metadata> metadata;
    mutable bool isMetadataLoaded;
   public:
    /**
     * Constructor that lets the user specify additional options.
     * @param contents of the file
     * @param options options for reading
     * @param fileLength the length of the file in bytes
     * @param postscriptLength the length of the postscript in bytes
     */
    ReaderImpl(std::shared_ptr<FileContents> contents,
               const ReaderOptions& options,
               uint64_t fileLength,
               uint64_t postscriptLength);

    const ReaderOptions& getReaderOptions() const;

    CompressionKind getCompression() const override;

    FileVersion getFormatVersion() const override;

    WriterId getWriterId() const override;

    uint32_t getWriterIdValue() const override;

    WriterVersion getWriterVersion() const override;

    uint64_t getNumberOfRows() const override;

    uint64_t getRowIndexStride() const override;

    std::list<std::string> getMetadataKeys() const override;

    std::string getMetadataValue(const std::string& key) const override;

    bool hasMetadataValue(const std::string& key) const override;

    uint64_t getCompressionSize() const override;

    uint64_t getNumberOfStripes() const override;

    std::unique_ptr<StripeInformation> getStripe(uint64_t
                                                 ) const override;

    uint64_t getNumberOfStripeStatistics() const override;

    const std::string& getStreamName() const override;

    std::unique_ptr<StripeStatistics>
    getStripeStatistics(uint64_t stripeIndex) const override;

    std::unique_ptr<RowReader> createRowReader() const override;

    std::unique_ptr<RowReader> createRowReader(const RowReaderOptions& options
                                               ) const override;

    uint64_t getContentLength() const override;
    uint64_t getStripeStatisticsLength() const override;
    uint64_t getFileFooterLength() const override;
    uint64_t getFilePostscriptLength() const override;
    uint64_t getFileLength() const override;

    std::unique_ptr<Statistics> getStatistics() const override;

    std::unique_ptr<ColumnStatistics> getColumnStatistics(uint32_t columnId
                                                          ) const override;

    std::string getSerializedFileTail() const override;

    const Type& getType() const override;

    bool hasCorrectStatistics() const override;

    const proto::PostScript* getPostscript() const {return contents->postscript.get();}

    uint64_t getBlockSize() const {return contents->blockSize;}

    const proto::Footer* getFooter() const {return contents->footer.get();}

    const Type* getSchema() const {return contents->schema.get();}

    InputStream* getStream() const {return contents->stream.get();}

    uint64_t getMemoryUse(int stripeIx = -1) override;

    uint64_t getMemoryUseByFieldId(const std::list<uint64_t>& include, int stripeIx=-1) override;

    uint64_t getMemoryUseByName(const std::list<std::string>& names, int stripeIx=-1) override;

    uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) override;

    std::map<uint32_t, BloomFilterIndex>
    getBloomFilters(uint32_t stripeIndex, const std::set<uint32_t>& included) const override;
  };
 类似资料: