Skip to content

Commit

Permalink
Revert "Read multiple row groups in Parquet files correctly (#3950)" (#…
Browse files Browse the repository at this point in the history
…3969)

This reverts commit 091b8dd.
  • Loading branch information
ajpotts authored Jan 10, 2025
1 parent 091b8dd commit 135c02d
Showing 1 changed file with 16 additions and 25 deletions.
41 changes: 16 additions & 25 deletions src/parquet/ReadParquet.cpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
#include "ReadParquet.h"
#include "UtilParquet.h"

// Returns the number of elements read
template <typename ReaderType, typename ChplType>
int64_t readColumn(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
int64_t readColumn(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize,
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
int64_t num_read = 0;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
*startIdx -= reader->Skip(*startIdx);
startIdx -= reader->Skip(startIdx);

if (not hasNonFloatNulls) {
while (reader->HasNext() && i < numElems) {
if((numElems - i) < batchSize) // adjust batchSize if needed
batchSize = numElems - i;
(void)reader->ReadBatch(batchSize, nullptr, nullptr, &chpl_ptr[i], &values_read);
i+=values_read;
num_read += values_read;
}
}
else {
Expand All @@ -30,23 +27,21 @@ int64_t readColumn(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parquet::C
where_null_chpl[i] = true;
}
i++;
num_read++;
}
}
return num_read;
return i;
}

template <typename ReaderType, typename ChplType, typename PqType>
int64_t readColumnDbFl(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
int64_t readColumnDbFl(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize,
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
*startIdx -= reader->Skip(*startIdx);
startIdx -= reader->Skip(startIdx);

int64_t num_read = 0;
while (reader->HasNext() && i < numElems) {
PqType value;
(void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
Expand All @@ -58,22 +53,20 @@ int64_t readColumnDbFl(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parque
chpl_ptr[i] = NAN;
}
i++;
num_read++;
}
return num_read;
return i;
}

template <typename ReaderType, typename ChplType, typename PqType>
int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t *startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t startIdx, std::shared_ptr<parquet::ColumnReader> column_reader,
bool hasNonFloatNulls, int64_t i, int64_t numElems, int64_t batchSize,
int64_t values_read, bool* where_null_chpl) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (ChplType*)chpl_arr;
ReaderType* reader =
static_cast<ReaderType*>(column_reader.get());
*startIdx -= reader->Skip(*startIdx);
startIdx -= reader->Skip(startIdx);

int64_t num_read = 0;
if (not hasNonFloatNulls) {
PqType* tmpArr = (PqType*)malloc(batchSize * sizeof(int32_t));
while (reader->HasNext() && i < numElems) {
Expand All @@ -85,7 +78,6 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t *startIdx, std::shar
for (int64_t j = 0; j < values_read; j++)
chpl_ptr[i+j] = (ChplType)tmpArr[j];
i+=values_read;
num_read+=values_read;
}
free(tmpArr);
}
Expand All @@ -101,10 +93,9 @@ int64_t readColumnIrregularBitWidth(void* chpl_arr, int64_t *startIdx, std::shar
chpl_ptr[i] = (int64_t)tmp;
}
i++;
num_read++;
}
}
return num_read;
return i;
}

int cpp_readStrColumnByName(const char* filename, void* chpl_arr, const char* colname, int64_t numElems, int64_t batchSize, char** errMsg) {
Expand Down Expand Up @@ -191,7 +182,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
int num_row_groups = file_metadata->num_row_groups();

int64_t i = 0;
for (int r = 0; (r < num_row_groups) && (i < numElems); r++) {
for (int r = 0; r < num_row_groups; r++) {
std::shared_ptr<parquet::RowGroupReader> row_group_reader =
parquet_reader->RowGroup(r);

Expand All @@ -200,6 +191,7 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
std::shared_ptr<parquet::ColumnReader> column_reader;

auto idx = file_metadata -> schema() -> ColumnIndex(colname);
auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed

if(idx < 0) {
std::string dname(colname);
Expand All @@ -208,20 +200,19 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
*errMsg = strdup(msg.c_str());
return ARROWERROR;
}
auto max_def = file_metadata -> schema() -> Column(idx) -> max_definition_level(); // needed to determine if nulls are allowed

column_reader = row_group_reader->Column(idx);

// Since int64 and uint64 Arrow dtypes share a physical type and only differ
// in logical type, they must be read from the file in the same way
if(ty == ARROWINT64 || ty == ARROWUINT64) {
i += readColumn<parquet::Int64Reader, int64_t>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
i += readColumn<parquet::Int64Reader, int64_t>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWINT32 || ty == ARROWUINT32) {
i += readColumnIrregularBitWidth<parquet::Int32Reader, int64_t, int32_t>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
i += readColumnIrregularBitWidth<parquet::Int32Reader, int64_t, int32_t>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWBOOLEAN) {
i += readColumn<parquet::BoolReader, bool>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
i += readColumn<parquet::BoolReader, bool>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWSTRING) {
int16_t definition_level; // nullable type and only reading single records in batch
Expand All @@ -242,10 +233,10 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
i++; // skip one space so the strings are null terminated with a 0
}
} else if(ty == ARROWFLOAT) {
i += readColumnDbFl<parquet::FloatReader, double, float>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
i += readColumnDbFl<parquet::FloatReader, double, float>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWDOUBLE) {
i += readColumnDbFl<parquet::DoubleReader, double, double>(chpl_arr, &startIdx, column_reader, hasNonFloatNulls, i,
i += readColumnDbFl<parquet::DoubleReader, double, double>(chpl_arr, startIdx, column_reader, hasNonFloatNulls, i,
numElems, batchSize, values_read, where_null_chpl);
} else if(ty == ARROWDECIMAL) {
auto chpl_ptr = (double*)chpl_arr;
Expand Down

0 comments on commit 135c02d

Please sign in to comment.