diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java index 5845ba13b..6bdb4ee0c 100644 --- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java +++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java @@ -51,15 +51,15 @@ public class OnlineIngestion extends RestDto { @Getter @Setter - private Integer processedEntries; + private Integer rowsUpserted; @Getter @Setter - private Integer insertedEntries; + private Integer rowsFailed; @Getter @Setter - private Integer abortedEntries; + private Integer rowsIgnored; @Getter @Setter @@ -81,9 +81,9 @@ public void refresh() throws FeatureStoreException, IOException { this.id = onlineIngestion.id; this.numEntries = onlineIngestion.numEntries; this.currentOffsets = onlineIngestion.currentOffsets; - this.processedEntries = onlineIngestion.processedEntries; - this.insertedEntries = onlineIngestion.insertedEntries; - this.abortedEntries = onlineIngestion.abortedEntries; + this.rowsUpserted = onlineIngestion.rowsUpserted; + this.rowsFailed = onlineIngestion.rowsFailed; + this.rowsIgnored = onlineIngestion.rowsIgnored; this.batchResults = onlineIngestion.batchResults; this.featureGroup = onlineIngestion.featureGroup; } @@ -97,10 +97,11 @@ public void waitForCompletion(int timeout, int period) period = period * 1000; while (true) { - refresh(); + // Get total number of rows processed + long rowsProcessed = rowsUpserted + rowsFailed + rowsIgnored; // Check if the online ingestion is complete - if (numEntries != null && processedEntries >= numEntries) { + if (numEntries != null && rowsProcessed >= numEntries) { break; } @@ -113,6 +114,8 @@ public void waitForCompletion(int timeout, int period) // Sleep for the specified period in seconds Thread.sleep(period); + + refresh(); } } diff --git a/python/hsfs/core/online_ingestion.py b/python/hsfs/core/online_ingestion.py index df45c0001..fb4694a25 100644 --- a/python/hsfs/core/online_ingestion.py +++ b/python/hsfs/core/online_ingestion.py @@ -45,9 +45,9 @@ def __init__( id: Optional[int] = None, num_entries: Optional[int] = None, current_offsets: Optional[str] = None, - processed_entries: Optional[int] = None, - inserted_entries: Optional[int] = None, - aborted_entries: Optional[int] = None, + rows_upserted: Optional[int] = None, + rows_failed: Optional[int] = None, + rows_ignored: Optional[int] = None, batch_results: Union[ List[online_ingestion_batch_result.OnlineIngestionBatchResult], List[Dict[str, Any]], @@ -58,9 +58,9 @@ def __init__( self._id = id self._num_entries = num_entries # specified when inserting (optional since might not be specified when using streaming) self._current_offsets = current_offsets - self._processed_entries = processed_entries - self._inserted_entries = inserted_entries - self._aborted_entries = aborted_entries + self._rows_upserted = rows_upserted + self._rows_failed = rows_failed + self._rows_ignored = rows_ignored self._batch_results = ( [ ( @@ -129,16 +129,16 @@ def current_offsets(self) -> Optional[str]: return self._current_offsets @property - def processed_entries(self) -> int: - return 0 if self._processed_entries is None else self._processed_entries + def rows_upserted(self) -> int: + return 0 if self._rows_upserted is None else self._rows_upserted @property - def inserted_entries(self) -> int: - return 0 if self._inserted_entries is None else self._inserted_entries + def rows_failed(self) -> int: + return 0 if self._rows_failed is None else self._rows_failed @property - def aborted_entries(self) -> int: - return 0 if self._aborted_entries is None else self._aborted_entries + def rows_ignored(self) -> int: + return 0 if self._rows_ignored is None else self._rows_ignored @property def batch_results( @@ -165,14 +165,17 @@ def wait_for_completion(self, options: Dict[str, Any] = None): mininterval=1, ) as progress_bar: while True: - if self.aborted_entries: - progress_bar.colour = "RED" + # Get total number of rows processed + rows_processed = self.rows_upserted + self.rows_failed + self.rows_ignored - progress_bar.n = self.processed_entries + # Update progress bar + if self.rows_failed or self.rows_ignored: + progress_bar.colour = "RED" + progress_bar.n = rows_processed progress_bar.refresh() # Check if the online ingestion is complete - if self.num_entries and self.processed_entries >= self.num_entries: + if self.num_entries and rows_processed >= self.num_entries: break # Check if the timeout has been reached (if timeout is 0 we will wait indefinitely) diff --git a/python/tests/fixtures/online_ingestion_fixtures.json b/python/tests/fixtures/online_ingestion_fixtures.json index 23d804e60..4adf11e2e 100644 --- a/python/tests/fixtures/online_ingestion_fixtures.json +++ b/python/tests/fixtures/online_ingestion_fixtures.json @@ -6,9 +6,9 @@ { "id": 1, "num_entries": 10, - "processed_entries": 8, - "inserted_entries": 6, - "aborted_entries": 2 + "rows_upserted": 8, + "rows_failed": 6, + "rows_ignored": 2 } ] }