diff --git a/src/parquet/ReadParquet.cpp b/src/parquet/ReadParquet.cpp index 15dfbbe77b..244276d1b1 100644 --- a/src/parquet/ReadParquet.cpp +++ b/src/parquet/ReadParquet.cpp @@ -1,17 +1,15 @@ #include "ReadParquet.h" #include "UtilParquet.h" -// Returns the number of elements read template -int64_t readColumn(void* chpl_arr, int64_t *startIdx, std::shared_ptr column_reader, +int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr column_reader, bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize, int64_t values_read, bool* where_null_chpl) { int16_t definition_level; // nullable type and only reading single records in batch auto chpl_ptr = (ChplType*)chpl_arr; - int64_t num_read = 0; ReaderType* reader = static_cast(column_reader.get()); - *startIdx -= reader->Skip(*startIdx); + startIdx -= reader->Skip(startIdx); if (not hasNonFloatNulls) { while (reader->HasNext() && i < numElems) { @@ -19,7 +17,6 @@ int64_t readColumn(void* chpl_arr, int64_t *startIdx, std::shared_ptrReadBatch(batchSize, nullptr, nullptr, &chpl_ptr[i], &values_read); i+=values_read; - num_read += values_read; } } else { @@ -30,23 +27,21 @@ int64_t readColumn(void* chpl_arr, int64_t *startIdx, std::shared_ptr -int64_t readColumnDbFl(void* chpl_arr, int64_t *startIdx, std::shared_ptr column_reader, +int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr column_reader, bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize, int64_t values_read, bool* where_null_chpl) { int16_t definition_level; // nullable type and only reading single records in batch auto chpl_ptr = (ChplType*)chpl_arr; ReaderType* reader = static_cast(column_reader.get()); - *startIdx -= reader->Skip(*startIdx); + startIdx -= reader->Skip(startIdx); - int64_t num_read = 0; while (reader->HasNext() && i < numElems) { PqType value; (void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read); @@ -58,22 +53,20 @@ int64_t readColumnDbFl(void* chpl_arr, int64_t *startIdx, std::shared_ptr -int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t *startIdx, std::shared_ptr column_reader, +int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t startIdx, std::shared_ptr column_reader, bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize, int64_t values_read, bool* where_null_chpl) { int16_t definition_level; // nullable type and only reading single records in batch auto chpl_ptr = (ChplType*)chpl_arr; ReaderType* reader = static_cast(column_reader.get()); - *startIdx -= reader->Skip(*startIdx); + startIdx -= reader->Skip(startIdx); - int64_t num_read = 0; if (not hasNonFloatNulls) { PqType* tmpArr = (PqType*)malloc(batchSize * sizeof(int32_t)); while (reader->HasNext() && i < numElems) { @@ -85,7 +78,6 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t *startIdx, std::shar for (int64_t j = 0; j < values_read; j++) chpl_ptr[i+j] = (ChplType)tmpArr[j]; i+=values_read; - num_read+=values_read; } free(tmpArr); } @@ -101,10 +93,9 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t *startIdx, std::shar chpl_ptr[i] = (int64_t)tmp; } i++; - num_read++; } } - return num_read; + return i; } int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg) { @@ -191,7 +182,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_ int num_row_groups = file_metadata->num_row_groups(); int64_t i = 0; - for (int r = 0; (r < num_row_groups) && (i < numElems); r++) { + for (int r = 0; r < num_row_groups; r++) { std::shared_ptr row_group_reader = parquet_reader->RowGroup(r); @@ -200,6 +191,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_ std::shared_ptr column_reader; auto idx = file_metadata -> schema() -> ColumnIndex(colname); + auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed if(idx < 0) { std::string dname(colname); @@ -208,20 +200,19 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_ *errMsg = strdup(msg.c_str()); return ARROWERROR; } - auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed column_reader = row_group_reader->Column(idx); // Since int64 and uint64 Arrow dtypes share a physical type and only differ // in logical type, they must be read from the file in the same way if(ty == ARROWINT64 || ty == ARROWUINT64) { - i += readColumn(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i, + i += readColumn(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i, numElems, batchSize, values_read, where_null_chpl); } else if(ty == ARROWINT32 || ty == ARROWUINT32) { - i += readColumnIrregularBitWidth(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i, + i += readColumnIrregularBitWidth(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i, numElems, batchSize, values_read, where_null_chpl); } else if(ty == ARROWBOOLEAN) { - i += readColumn(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i, + i += readColumn(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i, numElems, batchSize, values_read, where_null_chpl); } else if(ty == ARROWSTRING) { int16_t definition_level; // nullable type and only reading single records in batch @@ -242,10 +233,10 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_ i++; // skip one space so the strings are null terminated with a 0 } } else if(ty == ARROWFLOAT) { - i += readColumnDbFl(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i, + i += readColumnDbFl(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i, numElems, batchSize, values_read, where_null_chpl); } else if(ty == ARROWDOUBLE) { - i += readColumnDbFl(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i, + i += readColumnDbFl(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i, numElems, batchSize, values_read, where_null_chpl); } else if(ty == ARROWDECIMAL) { auto chpl_ptr = (double*)chpl_arr;