Read_and_agg in detail

Overview

The most common way to run the Aggregator is with read_and_agg(), which automates data read-in, processing, parallelisation, metadata, and saving. This requires that the output of the response models is saved out, which is almost always the case for both recordkeeping and processing purposes. This vastly simplifies automated, consistent running over many scenarios.

To do the same analyses as in the multi aggregate example but using read_and_agg(), we give it the path to the data instead of the data itself. If the directory contains multiple files, read_and_agg() provides capacity to operate over those files in parallel.

Important

Subdirectories should represent scenarios, because scenario outcomes are not interdependent (should be compared, not combined). In contrast, other dimensions (e.g. location) are interdependent. If directories are not scenarios but separate units that should be aggregated, e.g. gauges which should be aggregated to basin, parallelisation will not work and many other steps will be more difficult. See more information.

We can return output to the active session with returnList and use savepath to save a .rds file. In most cases, we would save outputs so developing and adjusting Comparer outputs does not rely on re-running the aggregations.

The read_and_agg() function saves metadata files (yaml and json) that allows replication of this step with run_hydrobot_params(), see here. These files build on metadata from earlier steps if possible, including any available metadata from the Controller about module parameters and the scenarios.

Demonstration

Here, we perform the same set of aggregations as the primary example for multi_aggregate, but do so from the paths to the EWR outputs and note differences. See that notebook for much more detail about how aggregation itself works.

Directories

First, we need to provide a set of paths to point to the input data, in this case the outputs from the EWR tool for the small demonstration, created by a controller notebook.

Note that we specify a path here for the aggregator results.

project_dir <- "hydrobot_scenarios"
hydro_dir <- file.path(project_dir, "hydrographs")
ewr_results <- file.path(project_dir, "module_output", "EWR")
agg_results <- file.path(project_dir, "aggregator_output", "demo")

We can see that those outputs (the csvs) are in scenario-based subdirectories, with the yaml and json metadata for the Controller in the outer directory.

list.files(ewr_results, recursive = TRUE)
[1] "base/yearly_base.csv"   "down4/yearly_down4.csv" "ewr_metadata.json"     
[4] "ewr_metadata.yml"       "up4/yearly_up4.csv"    

Scenario information

This will be attached to metadata, typically. For this demonstration, we just use it for plot clarity and the data is simple.

multipliers <- c(4)

scenemults <- c(1 / rev(multipliers), 1, multipliers)

scenenames <- c(
  paste0("down", as.character(rev(multipliers))),
  "base",
  paste0("up", as.character(multipliers))
) |>
  stringr::str_replace("\\.", "_")


scenarios <- tibble::tibble(scenario = scenenames, delta = scenemults)

scene_pal <- make_pal(unique(scenarios$scenario), palette = "ggsci::nrc_npg", refvals = "base", refcols = "black")

Aggregation sequences

We use the same examples as in multi_aggregate. These cover all three dimensions. It begins temporal (all_time), then has two theme aggregations (ewr_code and env_obj), then spatial to sdl_units, two more theme-dimension (Specific_goal, Objective), a spatially-weighted aggregation to the basin, and finally to the theme level of 5-year management targets.

aggseq <- list(
  all_time = "all_time",
  ewr_code = c("ewr_code_timing", "ewr_code"),
  env_obj = c("ewr_code", "env_obj"),
  sdl_units = sdl_units,
  Specific_goal = c("env_obj", "Specific_goal"),
  Objective = c("Specific_goal", "Objective"),
  basin = basin,
  target_5_year_2024 = c("Objective", "target_5_year_2024")
)

funseq <- list(
  all_time = "ArithmeticMean",
  ewr_code = "CompensatingFactor",
  env_obj = "ArithmeticMean",
  sdl_units = "ArithmeticMean",
  Specific_goal = "ArithmeticMean",
  Objective = "ArithmeticMean",
  basin = "SpatialWeightedMean",
  target_5_year_2024 = "ArithmeticMean"
)

Do the aggregation

Now, we use read_and_agg() to read the data in, aggregate it, and save it out. We also return it here for making some simple example plots. Rather than using auto_ewr_PU = TRUE, we use group_until and pseudo_spatial as defined in more detail elsewhere.

Tip

HydroBOT can aggregate more than one column at a time if they use the same aggregation sequence. We demonstrate that here, with aggCols = c("frequency_achieved", "ewr_achieved") . We could also include any other outcome column if desired.

agged_dat <- read_and_agg(
  datpath = ewr_results,
  type = "achievement",
  geopath = bom_basin_gauges,
  causalpath = causal_ewr,
  groupers = "scenario",
  aggCols = c("ewr_achieved", "frequency_achieved"),
  group_until = list(
    SWSDLName = is_notpoint,
    planning_unit_name = is_notpoint,
    gauge = is_notpoint
  ),
  pseudo_spatial = "sdl_units",
  aggsequence = aggseq,
  funsequence = funseq,
  saveintermediate = TRUE,
  namehistory = FALSE,
  keepAllPolys = FALSE,
  returnList = TRUE,
  savepath = agg_results,
  add_max = FALSE
)
! Unmatched links in causal network
• 11 from env_obj to Specific_goal
! Unmatched links in causal network
• 7 from Objective to target_5_year_2024

That has the same information as the example in multi_aggregate, with 9 levels of aggregation:

names(agged_dat)
[1] "agg_input"          "all_time"           "ewr_code"          
[4] "env_obj"            "sdl_units"          "Specific_goal"     
[7] "Objective"          "basin"              "target_5_year_2024"

We will only show one example sheet here.

agged_dat$sdl_units |>
  dplyr::filter(env_obj %in% c("EF1", "WB1", "NF1")) |>
  dplyr::left_join(scenarios) |>
  plot_outcomes(
    outcome_col = "ewr_achieved",
    plot_type = "map",
    colorgroups = NULL,
    colorset = "ewr_achieved",
    pal_list = list("scico::lapaz"),
    pal_direction = -1,
    facet_col = "scenario",
    facet_row = "env_obj",
    sceneorder = c("down4", "base", "up4"),
    underlay_list = list(
      underlay = sdl_units,
      underlay_pal = "grey90"
    )
  )

Metadata

A key advantage of read_and_agg() is that it records metadata for data provenance (which is also runnable).

For the run here, that yaml is at "hydrobot_scenarios/aggregator_output/demo/agg_metadata.yml", and contains the following information:


scenarios:
  scenario_name:
  - down4
  - base
  - up4
  flow_multiplier:
  - 0.25
  - 1.0
  - 4.0
ewr:
  output_parent_dir: hydrobot_scenarios
  hydro_dir: hydrobot_scenarios/hydrographs
  output_path: hydrobot_scenarios/module_output/EWR
  model_format: Standard time-series
  outputType: yearly
  returnType: none
  finish_time: 2025-04-04 15:44:05 AEDT
  status: yes
  ewr_version: py-ewr 2.3.7
  HydroBOT_version: 0.2.2.9020
aggregation:
  datpath: hydrobot_scenarios/module_output/EWR
  type: achievement
  groupers: scenario
  group_until:
    SWSDLName: 4
    planning_unit_name: 4
    gauge: 4
  pseudo_spatial: sdl_units
  aggCols: ewr_achieved
  aggsequence:
    all_time: all_time
    ewr_code:
    - ewr_code_timing
    - ewr_code
    env_obj:
    - ewr_code
    - env_obj
    sdl_units: sdl_units
    Specific_goal:
    - env_obj
    - Specific_goal
    Objective:
    - Specific_goal
    - Objective
    basin: basin
    target_5_year_2024:
    - Objective
    - target_5_year_2024
  funsequence:
    all_time: ArithmeticMean
    ewr_code: CompensatingFactor
    env_obj: ArithmeticMean
    sdl_units: ArithmeticMean
    Specific_goal: ArithmeticMean
    Objective: ArithmeticMean
    basin: SpatialWeightedMean
    target_5_year_2024: ArithmeticMean
  namehistory: no
  keepAllPolys: no
  auto_ewr_PU: no
  returnList: yes
  finish_time: 2025-04-04 15:44:11 AEDT
  status: yes
  HydroBOT_version: 0.2.2.9020

Parallelization

Since aggregation should happen along the theme, space, and time dimensions, but not scenarios, we can process in parallel over scenarios. The read_and_agg() function provides this parallelisation internally and seamlessly, provided the user has the suggested package {furrr} (and its dependency, {future}). In that case, parallelising is as easy as setting a future::plan and the argument rparallel = TRUE. The exact same run as above can be done in parallel as follows:

future::plan(future::multisession)

agged_dat_p <- read_and_agg(
  datpath = ewr_results,
  type = "achievement",
  geopath = bom_basin_gauges,
  causalpath = causal_ewr,
  groupers = "scenario",
  aggCols = "ewr_achieved",
  group_until = list(
    SWSDLName = is_notpoint,
    planning_unit_name = is_notpoint,
    gauge = is_notpoint
  ),
  pseudo_spatial = "sdl_units",
  aggsequence = aggseq,
  funsequence = funseq,
  saveintermediate = TRUE,
  namehistory = FALSE,
  keepAllPolys = FALSE,
  returnList = TRUE,
  savepath = agg_results,
  add_max = FALSE,
  rparallel = TRUE
)

That output is the same as earlier, but now it’s been read-in and processed in parallel over scenarios. This toy example is only marginally faster, but parallelisation yields large speedups for larger jobs. Because scenarios run independently, massive parallelisation is possible, up to one scenario per core. Speedups can be very large, even on local machines, but are particularly useful on HPCs.

User-provided components

Users can provide their own causal networks, spatial information, and aggregation functions. Moreover, aggregation can happen on arbitrary input data, possibly from unincorporated modules. See those pages for more detail using read_and_agg(), along with doing the same in multi_aggregate().

Sub-directories and multiple aggsequences

Sometimes we might want multiple aggregation sequences to address different parts of a question, or to compare the impact of aggregation itself. Doing that is as simple as creating the multiple aggregation sequences, and typically saving them to a different directory, from which they can be read for the comparer.

For example, if we wanted to aggregate to Targets (Native fish, Waterbirds, etc) instead of the Objectives and long-run targets for the same EWR output as above, and also use different aggregation functions, we could set up new sequences. Here, we also call the spatial data as characters and define our own Median function, as explained in more detail here and here.

aggseq_new <- list(
  all_time = "all_time",
  ewr_code = c("ewr_code_timing", "ewr_code"),
  env_obj = c("ewr_code", "env_obj"),
  sdl_units = "sdl_units",
  Target = c("env_obj", "Target"),
  basin = "basin"
)

Median <- function(x) {
  median(x, na.rm = TRUE)
}

funseq_new <- list(
  all_time = "ArithmeticMean",
  ewr_code = "LimitingFactor",
  env_obj = "Median",
  sdl_units = "Max",
  Target = "Min",
  basin = "SpatialWeightedMean"
)

Then, to avoid overwriting the earlier version, since in this example we would want to look at both sets of aggregations, we change the savepath argument (along with aggsequence and funsequence:

agged_dat_new <- read_and_agg(
  datpath = ewr_results,
  type = "achievement",
  geopath = bom_basin_gauges,
  causalpath = causal_ewr,
  groupers = "scenario",
  aggCols = "ewr_achieved",
  group_until = list(
    SWSDLName = is_notpoint,
    planning_unit_name = is_notpoint,
    gauge = is_notpoint
  ),
  pseudo_spatial = "sdl_units",
  aggsequence = aggseq_new,
  funsequence = funseq_new,
  saveintermediate = TRUE,
  namehistory = FALSE,
  keepAllPolys = FALSE,
  returnList = TRUE,
  savepath = file.path(project_dir, "aggregator_output", "second"),
  add_max = FALSE
)

Quantity to aggregate

We have demonstrated everything here by aggregating EWR data with the ewr_achieved column. However, any numeric column can be aggregated, as we show for made up module outputs. Here, we also show aggregation on a different EWR metric, the achievement of interevent requirements:

agged_interevent <- read_and_agg(
  datpath = ewr_results,
  type = "achievement",
  geopath = bom_basin_gauges,
  causalpath = causal_ewr,
  groupers = "scenario",
  aggCols = "interevent_achieved",
  group_until = list(
    SWSDLName = is_notpoint,
    planning_unit_name = is_notpoint,
    gauge = is_notpoint
  ),
  pseudo_spatial = "sdl_units",
  aggsequence = aggseq,
  funsequence = funseq,
  saveintermediate = TRUE,
  namehistory = FALSE,
  keepAllPolys = FALSE,
  returnList = TRUE,
  savepath = agg_results,
  add_max = FALSE
)
! Unmatched links in causal network
• 11 from env_obj to Specific_goal
! Unmatched links in causal network
• 7 from Objective to target_5_year_2024

That has the same sheets as above

names(agged_interevent)
[1] "agg_input"          "all_time"           "ewr_code"          
[4] "env_obj"            "sdl_units"          "Specific_goal"     
[7] "Objective"          "basin"              "target_5_year_2024"

And the map is similar but not identical (interevents are closely related to the frequency, so this is not surprising).

agged_interevent$sdl_units |>
  dplyr::filter(env_obj %in% c("EF1", "WB1", "NF1")) |>
  dplyr::left_join(scenarios) |>
  plot_outcomes(
    outcome_col = "interevent_achieved",
    plot_type = "map",
    colorgroups = NULL,
    colorset = "interevent_achieved",
    pal_list = list("scico::lapaz"),
    pal_direction = -1,
    facet_col = "scenario",
    facet_row = "env_obj",
    sceneorder = c("down4", "base", "up4"),
    underlay_list = list(
      underlay = sdl_units,
      underlay_pal = "grey90"
    )
  )

Gauge and scenario filtering

We usually want to apply analyses to the full set of scenarios or gauges. However, sometimes we might only want a subset. Using the gaugefilter and scenariofilter arguments can provide this functionality. Howevern this is a bit dangerous because they are simple regex on the filenames, and so are not actually specific to gauges or scenarios and are only named that way to make the regex simpler if multiple matches are desired. Moreover, if the files do not have a pattern, e.g. there are not filenames with gauge numbers in them and you try to send those numbers in gaugefilter, it will error. Since these examples do not have unique gauge files, we only use scenariofilter.

smallreadagg <- read_and_agg(
  datpath = ewr_results,
  type = "achievement",
  geopath = bom_basin_gauges,
  causalpath = causal_ewr,
  groupers = c("scenario", "gauge"),
  aggCols = "ewr_achieved",
  aggsequence = aggseq,
  funsequence = funseq,
  auto_ewr_PU = TRUE,
  add_max = FALSE,
  namehistory = FALSE,
  gaugefilter = NULL,
  scenariofilter = "base"
)
Warning: Causal network does not have all groupers.
• Joining env_obj to Specific_goal
• Groupers are scenario, gauge, polyID.
• expect causal network to have gauge; it has planning_unit_name, SWSDLName, env_obj, Specific_goal, fromtype, totype, edgeorder.
• Do you need to use `group_until`? Or is your network missing columns?
Warning: Causal network does not have all groupers.
• Joining Specific_goal to Objective
• Groupers are scenario, gauge, polyID.
• expect causal network to have gauge; it has planning_unit_name, SWSDLName, Specific_goal, Objective, fromtype, totype, edgeorder.
• Do you need to use `group_until`? Or is your network missing columns?
Warning: Causal network does not have all groupers.
• Joining Objective to target_5_year_2024
• Groupers are scenario, gauge, polyID.
• expect causal network to have gauge; it has Objective, target_5_year_2024, fromtype, totype, edgeorder.
• Do you need to use `group_until`? Or is your network missing columns?
table(smallreadagg$gauge, smallreadagg$scenario)
        
         base MAX
  412002   51  51
  412005   51  51
  412038   51  51
  421001   49  49
  421004   72  72
  421011   70  70