@@ -349,7 +349,7 @@ func (d *DictionaryPage) Release() {
349349func (d * DictionaryPage ) IsSorted () bool { return d .sorted }
350350
351351type serializedPageReader struct {
352- r parquet.BufferedReader
352+ r parquet.BufferedReaderV2
353353 chunk * metadata.ColumnChunkMetaData
354354 colIdx int
355355 pgIndexReader * metadata.RowGroupPageIndexReader
@@ -421,6 +421,25 @@ func (p *serializedPageReader) init(compressType compress.Compression, ctx *Cryp
421421 return nil
422422}
423423
424+ type bufferedReaderV2Adapter struct {
425+ parquet.BufferedReader
426+ }
427+
428+ func (b * bufferedReaderV2Adapter ) Buffered () int {
429+ return 0
430+ }
431+
432+ func (b * bufferedReaderV2Adapter ) Free () {
433+ // no-op
434+ }
435+
436+ func getBufferedReaderV2 (r parquet.BufferedReader ) parquet.BufferedReaderV2 {
437+ if brV2 , ok := r .(parquet.BufferedReaderV2 ); ok {
438+ return brV2
439+ }
440+ return & bufferedReaderV2Adapter {BufferedReader : r }
441+ }
442+
424443// NewPageReader returns a page reader for the data which can be read from the provided reader and compression.
425444//
426445// Deprecated: This function isn't properly safe for public API use and should not be utilized
@@ -436,7 +455,7 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
436455 }
437456
438457 rdr := & serializedPageReader {
439- r : r ,
458+ r : getBufferedReaderV2 ( r ) ,
440459 maxPageHeaderSize : defaultMaxPageHeaderSize ,
441460 nrows : nrows ,
442461 mem : mem ,
@@ -458,7 +477,10 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
458477func (p * serializedPageReader ) Reset (r parquet.BufferedReader , nrows int64 , compressType compress.Compression , ctx * CryptoContext ) {
459478 p .rowsSeen , p .pageOrd , p .nrows = 0 , 0 , nrows
460479 p .curPageHdr , p .curPage , p .err = nil , nil , nil
461- p .r = r
480+ if p .r != nil && p .r != r {
481+ p .r .Free ()
482+ }
483+ p .r = getBufferedReaderV2 (r )
462484
463485 p .codec , p .err = compress .GetCodec (compressType )
464486 if p .err != nil {
@@ -593,7 +615,7 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) {
593615 return nil , nil
594616}
595617
596- func (p * serializedPageReader ) readPageHeader (rd parquet.BufferedReader , hdr * format.PageHeader ) error {
618+ func (p * serializedPageReader ) readPageHeader (rd parquet.BufferedReaderV2 , hdr * format.PageHeader ) error {
597619 allowedPgSz := defaultPageHeaderSize
598620 for {
599621 view , err := rd .Peek (allowedPgSz )
@@ -679,7 +701,7 @@ func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error {
679701
680702// readOrStealData attempts to steal data from the buffered reader if enough is buffered,
681703// otherwise reads from the underlying reader into the provided buffer.
682- func (p * serializedPageReader ) readOrStealData (r parquet.BufferedReader , lenCompressed int , buffer * memory.Buffer ) ([]byte , error ) {
704+ func (p * serializedPageReader ) readOrStealData (r parquet.BufferedReaderV2 , lenCompressed int , buffer * memory.Buffer ) ([]byte , error ) {
683705 if r .Buffered () >= lenCompressed {
684706 data , err := r .Peek (lenCompressed )
685707 if err != nil {
@@ -702,7 +724,7 @@ func (p *serializedPageReader) readOrStealData(r parquet.BufferedReader, lenComp
702724}
703725
704726func (p * serializedPageReader ) getPageBytesV1 (
705- r parquet.BufferedReader , isCompressed bool , lenCompressed , lenUncompressed int , buffer * memory.Buffer ,
727+ r parquet.BufferedReaderV2 , isCompressed bool , lenCompressed , lenUncompressed int , buffer * memory.Buffer ,
706728) ([]byte , error ) {
707729 // 8 possible cases:
708730 // 1. enough data buffered (r.Buffered() >= lenCompressed)
@@ -748,7 +770,7 @@ func (p *serializedPageReader) getPageBytesV1(
748770 return data , nil
749771}
750772
751- func (p * serializedPageReader ) readV2UnencryptedCompressedWithLevels (r parquet.BufferedReader , lenCompressed , lenUncompressed , levelsBytelen int , buffer * memory.Buffer ) ([]byte , error ) {
773+ func (p * serializedPageReader ) readV2UnencryptedCompressedWithLevels (r parquet.BufferedReaderV2 , lenCompressed , lenUncompressed , levelsBytelen int , buffer * memory.Buffer ) ([]byte , error ) {
752774 // Special case: unencrypted + compressed + has levels
753775 // Read levels directly into output buffer, compressed data into decompress buffer
754776 buffer .ResizeNoShrink (lenUncompressed )
@@ -766,7 +788,7 @@ func (p *serializedPageReader) readV2UnencryptedCompressedWithLevels(r parquet.B
766788}
767789
768790func (p * serializedPageReader ) getPageBytesV2 (
769- r parquet.BufferedReader , isCompressed bool , lenCompressed , lenUncompressed , levelsBytelen int , buffer * memory.Buffer ,
791+ r parquet.BufferedReaderV2 , isCompressed bool , lenCompressed , lenUncompressed , levelsBytelen int , buffer * memory.Buffer ,
770792) ([]byte , error ) {
771793 // Special case: unencrypted + compressed + has levels - read levels and compressed data separately
772794 if r .Buffered () < lenCompressed && p .cryptoCtx .DataDecryptor == nil && isCompressed && levelsBytelen > 0 {
0 commit comments