From 7cbb92acf13560279501fdf1dffd8070ca0bcd38 Mon Sep 17 00:00:00 2001 From: Jordan Evens Date: Mon, 22 Apr 2024 09:27:09 -0400 Subject: [PATCH] fix issues preventing running for new year --- docker-compose.yml | 3 --- tbd/src/cpp/Environment.cpp | 1 + tbd/src/py/firestarr/common.py | 2 ++ tbd/src/py/firestarr/datasources/cwfis.py | 2 +- tbd/src/py/firestarr/datasources/default.py | 4 +-- tbd/src/py/firestarr/main.py | 12 ++++++++- tbd/src/py/firestarr/run.py | 23 ++++++++++------- tbd/src/py/firestarr/simulation.py | 28 ++++++++++++++++++--- 8 files changed, 56 insertions(+), 19 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 948452b3f..4b140b72a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,3 @@ -# docker-compose build -version: '3.3' - services: gis: image: tbd_gis diff --git a/tbd/src/cpp/Environment.cpp b/tbd/src/cpp/Environment.cpp index 0ae1ecd8e..36baf0aed 100644 --- a/tbd/src/cpp/Environment.cpp +++ b/tbd/src/cpp/Environment.cpp @@ -85,6 +85,7 @@ Environment Environment::loadEnvironment(const string dir_out, for (const auto& raster : rasters) { auto fuel = raster; + printf("Replacing directory separators in path for: %s", fuel.c_str()); // make sure we're using a consistent directory separator std::replace(fuel.begin(), fuel.end(), '\\', '/'); // HACK: assume there's only one instance of 'fuel' in the file name we want to change diff --git a/tbd/src/py/firestarr/common.py b/tbd/src/py/firestarr/common.py index a4d81dfe5..aec957b4a 100644 --- a/tbd/src/py/firestarr/common.py +++ b/tbd/src/py/firestarr/common.py @@ -123,6 +123,8 @@ def ensure_dir(dir): sys.path.append(DIR_SRC_PY_CFFDRSNG) DIR_TBD = "/appl/tbd" +FILE_TBD_BINARY = os.path.join(DIR_TBD, "tbd") +FILE_TBD_SETTINGS = os.path.join(DIR_TBD, "settings.ini") DIR_SCRIPTS = os.path.join(DIR_TBD, "scripts") DIR_DATA = ensure_dir(os.path.abspath("/appl/data")) diff --git a/tbd/src/py/firestarr/datasources/cwfis.py b/tbd/src/py/firestarr/datasources/cwfis.py index 803822e20..a5d5cda3f 100644 --- a/tbd/src/py/firestarr/datasources/cwfis.py +++ b/tbd/src/py/firestarr/datasources/cwfis.py @@ -90,7 +90,7 @@ def get_shp(filename): df = get_shp("perimeters") # HACK: if empty then no results returned so fill with today where missing - df["LASTDATE"].loc[df["LASTDATE"].isna()] = datetime.date.today() + df.loc[df["LASTDATE"].isna(), "LASTDATE"] = datetime.date.today() df["datetime"] = to_utc(df["LASTDATE"]) since = pd.to_datetime(self._last_active_since, utc=True) return df.loc[df["datetime"] >= since] diff --git a/tbd/src/py/firestarr/datasources/default.py b/tbd/src/py/firestarr/datasources/default.py index 1dfbffef0..5ecf9ebcf 100644 --- a/tbd/src/py/firestarr/datasources/default.py +++ b/tbd/src/py/firestarr/datasources/default.py @@ -41,7 +41,7 @@ def wx_interpolate(df): date_min = df["datetime"].min() date_max = df["datetime"].max() times = pd.DataFrame( - pd.date_range(date_min, date_max, freq="H").values, columns=["datetime"] + pd.date_range(date_min, date_max, freq="h").values, columns=["datetime"] ) crs = df.crs index_names = df.index.names @@ -54,7 +54,7 @@ def wx_interpolate(df): g_fill = pd.merge(times, g, how="left") # treat rain as if it all happened at start of any gaps g_fill["prec"] = g_fill["prec"].fillna(0) - g_fill = g_fill.fillna(method="ffill") + g_fill = g_fill.ffill() g_fill[index_names] = i groups.append(g_fill) df_filled = to_gdf(pd.merge(pd.concat(groups), gdf_geom), crs) diff --git a/tbd/src/py/firestarr/main.py b/tbd/src/py/firestarr/main.py index 48f5c5be1..e9b18fc2d 100644 --- a/tbd/src/py/firestarr/main.py +++ b/tbd/src/py/firestarr/main.py @@ -7,6 +7,8 @@ DEFAULT_FILE_LOG_LEVEL, DIR_LOG, DIR_OUTPUT, + FILE_TBD_BINARY, + FILE_TBD_SETTINGS, SECONDS_PER_MINUTE, WX_MODEL, logging, @@ -57,6 +59,8 @@ def check_arg(a, args): no_publish, args = check_arg("--no-publish", args) no_merge, args = check_arg("--no-merge", args) no_wait, args = check_arg("--no-wait", args) + no_retry, args = check_arg("--no-retry", args) + do_retry = False if no_retry else True do_publish = False if no_publish else None do_merge = False if no_merge else None do_wait = not no_wait @@ -71,6 +75,8 @@ def wait_and_check_resume(): modelrun = os.path.basename(dir_model) # HACK: just trying to check if run used this weather prev = make_resume(do_publish=False, do_merge=False) + if prev is None: + return False wx_updated = prev._modelrun != modelrun if not wx_updated and not prev._published_clean: logging.info("Found previous run and trying to resume") @@ -129,7 +135,7 @@ def wait_and_check_resume(): ) run_attempts += 1 # returns true if just finished current run - is_outdated = not run_current.run_until_successful_or_outdated() + is_outdated = run_current.run_until_successful_or_outdated(no_retry=do_retry) is_published = run_current._published_clean should_rerun = not (no_resume or is_outdated) and (not is_published) logging.info( @@ -140,6 +146,10 @@ def wait_and_check_resume(): if __name__ == "__main__": + if not os.path.exists(FILE_TBD_BINARY): + raise RuntimeError(f"Unable to locate simulation model binary file {FILE_TBD_BINARY}") + if not os.path.exists(FILE_TBD_SETTINGS): + raise RuntimeError(f"Unable to locate simulation model settings file {FILE_TBD_SETTINGS}") logging.info("Called with args %s", str(sys.argv)) args_orig = sys.argv[1:] while not no_retry: diff --git a/tbd/src/py/firestarr/run.py b/tbd/src/py/firestarr/run.py index 9a179c05e..076aaab92 100644 --- a/tbd/src/py/firestarr/run.py +++ b/tbd/src/py/firestarr/run.py @@ -14,6 +14,8 @@ DIR_OUTPUT, DIR_SIMS, FILE_LOCK_PUBLISH, + FILE_TBD_BINARY, + FILE_TBD_SETTINGS, FLAG_IGNORE_PERIM_OUTPUTS, FLAG_SAVE_PREPARED, MAX_NUM_DAYS, @@ -372,14 +374,14 @@ def process(self): ) return df_final, changed - def run_until_successful_or_outdated(self): + def run_until_successful_or_outdated(self, no_retry=False): def is_current(): dir_model = get_model_dir_uncached(WX_MODEL) modelrun = os.path.basename(dir_model) return modelrun == self._modelrun # HACK: thread is throwing errors so just actually wait for now - result = self.run_until_successful() + result = self.run_until_successful(no_retry=no_retry) return is_current() # p = None # try: @@ -394,9 +396,11 @@ def is_current(): # if p and p.is_alive(): # p.terminate() - def run_until_successful(self): + def run_until_successful(self, no_retry=False): + should_try = True is_successful = False - while not is_successful: + while not is_successful and should_try: + should_try = no_retry df_final, changed = self.process() while True: is_successful = self.check_and_publish() @@ -445,11 +449,9 @@ def do_create(_): logging.info("Deleting existing fires") force_remove(_) # keep a copy of the settings for reference - shutil.copy( - "/appl/tbd/settings.ini", os.path.join(self._dir_model, "settings.ini") - ) + shutil.copy(FILE_TBD_SETTINGS, os.path.join(self._dir_model, "settings.ini")) # also keep binary instead of trying to track source - shutil.copy("/appl/tbd/tbd", os.path.join(self._dir_model, "tbd")) + shutil.copy(FILE_TBD_BINARY, os.path.join(self._dir_model, "tbd")) df_fires = self._src_fires.get_fires().to_crs(self._crs) save_shp(df_fires, os.path.join(self._dir_out, "df_fires_groups.shp")) df_fires["area"] = area_ha(df_fires) @@ -794,7 +796,10 @@ def make_resume(dir_resume=None, do_publish=False, do_merge=False, *args, **kwar if os.path.exists(os.path.join(DIR_SIMS, x, "data", "df_fires_groups.shp")) ] if not dirs: - raise RuntimeError("No valid runs to resume") + # raise RuntimeError("No valid runs to resume") + # shouldn't resume if can't + logging.warning("No valid runs to resume") + return None dir_resume = dirs[-1] dir_resume = os.path.join(DIR_SIMS, dir_resume) kwargs["dir"] = dir_resume diff --git a/tbd/src/py/firestarr/simulation.py b/tbd/src/py/firestarr/simulation.py index acdaf7e27..b7e0067e8 100644 --- a/tbd/src/py/firestarr/simulation.py +++ b/tbd/src/py/firestarr/simulation.py @@ -140,7 +140,11 @@ def do_create(_): df_wx_forecast = df_wx_forecast.loc[df_wx_forecast[COLUMN_TIME] > cur_time] # splice every other member onto shorter members dates_by_model = df_wx_forecast.groupby("model")[COLUMN_TIME].max().sort_values(ascending=False) - df_wx_forecast.loc[:, "id"] = df_wx_forecast["id"].apply(lambda x: f"{x:02d}") + # deprecated + # df_wx_forecast.loc[:, "id"] = df_wx_forecast["id"].apply(lambda x: f"{x:02d}") + ids = df_wx_forecast["id"] + del df_wx_forecast["id"] + df_wx_forecast.loc[:, "id"] = ids.apply(lambda x: f"{x:02d}") df_spliced = None for ( idx, @@ -195,14 +199,22 @@ def do_create(_): df_wx.loc[:, "lat"] = lat df_wx.loc[:, "lon"] = lon # times need to be in LST for cffdrs - df_wx.loc[:, COLUMN_TIME] = [x.tz_localize("UTC").tz_convert(tz_lst) for x in df_wx[COLUMN_TIME]] + # deprecated + # df_wx.loc[:, COLUMN_TIME] = [x.tz_localize("UTC").tz_convert(tz_lst) for x in df_wx[COLUMN_TIME]] + times = df_wx[COLUMN_TIME] + del df_wx[COLUMN_TIME] + df_wx.loc[:, COLUMN_TIME] = [x.tz_localize("UTC").tz_convert(tz_lst) for x in times] if FLAG_DEBUG: # make it easier to see problems if cffdrs isn't working save_geojson(df_wx, file_wx_streams) df_wx = read_gpd_file_safe(file_wx_streams) df_wx_fire = df_wx.rename(columns={"lon": "long", COLUMN_TIME: "TIMESTAMP"}) # remove timezone so it gets formatted properly - df_wx_fire.loc[:, "TIMESTAMP"] = [x.tz_localize(None) for x in df_wx_fire["TIMESTAMP"]] + # deprecated + # df_wx_fire.loc[:, "TIMESTAMP"] = [x.tz_localize(None) for x in df_wx_fire["TIMESTAMP"]] + timestamps = df_wx_fire["TIMESTAMP"] + del df_wx_fire["TIMESTAMP"] + df_wx_fire.loc[:, "TIMESTAMP"] = [x.tz_localize(None) for x in timestamps] df_wx_fire.columns = [s.upper() for s in df_wx_fire.columns] df_wx_fire[["YR", "MON", "DAY", "HR"]] = list( tqdm_util.apply( @@ -230,6 +242,16 @@ def do_create(_): ].sort_values(["ID", "LAT", "LONG", "TIMESTAMP"]) # NOTE: expects weather in localtime, but uses utcoffset to # figure out local sunrise/sunset + # FIX: if values are not valid then station isn't started so use TMP to figure out when it should + if not (0 <= ffmc_old): + print(f"Invalid FFMC value for startup {ffmc_old}") + ffmc_old = 0 + if not (0 <= dmc_old): + print(f"Invalid DMC value for startup {dmc_old}") + dmc_old = 0 + if not (0 <= dc_old): + print(f"Invalid DC value for startup {dc_old}") + dc_old = 0 df_fwi = cffdrs.hFWI(df_wx_fire, utcoffset_hours, ffmc_old, dmc_old, dc_old, silent=True) # HACK: get rid of missing values at end of period df_fwi = df_fwi[~np.isnan(df_fwi["FWI"])].reset_index(drop=True)