Skip to content

Commit

Permalink
rows ignored
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Jan 9, 2025
1 parent e30f2d5 commit 2144292
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public class OnlineIngestion extends RestDto<OnlineIngestion> {

@Getter
@Setter
private Integer processedEntries;
private Integer rowsUpserted;

@Getter
@Setter
private Integer insertedEntries;
private Integer rowsFailed;

@Getter
@Setter
private Integer abortedEntries;
private Integer rowsIgnored;

@Getter
@Setter
Expand All @@ -81,9 +81,9 @@ public void refresh() throws FeatureStoreException, IOException {
this.id = onlineIngestion.id;
this.numEntries = onlineIngestion.numEntries;
this.currentOffsets = onlineIngestion.currentOffsets;
this.processedEntries = onlineIngestion.processedEntries;
this.insertedEntries = onlineIngestion.insertedEntries;
this.abortedEntries = onlineIngestion.abortedEntries;
this.rowsUpserted = onlineIngestion.rowsUpserted;
this.rowsFailed = onlineIngestion.rowsFailed;
this.rowsIgnored = onlineIngestion.rowsIgnored;
this.batchResults = onlineIngestion.batchResults;
this.featureGroup = onlineIngestion.featureGroup;
}
Expand All @@ -97,10 +97,11 @@ public void waitForCompletion(int timeout, int period)
period = period * 1000;

while (true) {
refresh();
// Get total number of rows processed
long rowsProcessed = rowsUpserted + rowsFailed + rowsIgnored;

// Check if the online ingestion is complete
if (numEntries != null && processedEntries >= numEntries) {
if (numEntries != null && rowsProcessed >= numEntries) {
break;
}

Expand All @@ -113,6 +114,8 @@ public void waitForCompletion(int timeout, int period)

// Sleep for the specified period in seconds
Thread.sleep(period);

refresh();
}
}

Expand Down
35 changes: 19 additions & 16 deletions python/hsfs/core/online_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ def __init__(
id: Optional[int] = None,
num_entries: Optional[int] = None,
current_offsets: Optional[str] = None,
processed_entries: Optional[int] = None,
inserted_entries: Optional[int] = None,
aborted_entries: Optional[int] = None,
rows_upserted: Optional[int] = None,
rows_failed: Optional[int] = None,
rows_ignored: Optional[int] = None,
batch_results: Union[
List[online_ingestion_batch_result.OnlineIngestionBatchResult],
List[Dict[str, Any]],
Expand All @@ -58,9 +58,9 @@ def __init__(
self._id = id
self._num_entries = num_entries # specified when inserting (optional since might not be specified when using streaming)
self._current_offsets = current_offsets
self._processed_entries = processed_entries
self._inserted_entries = inserted_entries
self._aborted_entries = aborted_entries
self._rows_upserted = rows_upserted
self._rows_failed = rows_failed
self._rows_ignored = rows_ignored
self._batch_results = (
[
(
Expand Down Expand Up @@ -129,16 +129,16 @@ def current_offsets(self) -> Optional[str]:
return self._current_offsets

@property
def processed_entries(self) -> int:
return 0 if self._processed_entries is None else self._processed_entries
def rows_upserted(self) -> int:
return 0 if self._rows_upserted is None else self._rows_upserted

@property
def inserted_entries(self) -> int:
return 0 if self._inserted_entries is None else self._inserted_entries
def rows_failed(self) -> int:
return 0 if self._rows_failed is None else self._rows_failed

@property
def aborted_entries(self) -> int:
return 0 if self._aborted_entries is None else self._aborted_entries
def rows_ignored(self) -> int:
return 0 if self._rows_ignored is None else self._rows_ignored

@property
def batch_results(
Expand All @@ -165,14 +165,17 @@ def wait_for_completion(self, options: Dict[str, Any] = None):
mininterval=1,
) as progress_bar:
while True:
if self.aborted_entries:
progress_bar.colour = "RED"
# Get total number of rows processed
rows_processed = self.rows_upserted + self.rows_failed + self.rows_ignored

progress_bar.n = self.processed_entries
# Update progress bar
if self.rows_failed or self.rows_ignored:
progress_bar.colour = "RED"
progress_bar.n = rows_processed
progress_bar.refresh()

# Check if the online ingestion is complete
if self.num_entries and self.processed_entries >= self.num_entries:
if self.num_entries and rows_processed >= self.num_entries:
break

# Check if the timeout has been reached (if timeout is 0 we will wait indefinitely)
Expand Down
6 changes: 3 additions & 3 deletions python/tests/fixtures/online_ingestion_fixtures.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
{
"id": 1,
"num_entries": 10,
"processed_entries": 8,
"inserted_entries": 6,
"aborted_entries": 2
"rows_upserted": 8,
"rows_failed": 6,
"rows_ignored": 2
}
]
}
Expand Down

0 comments on commit 2144292

Please sign in to comment.