import json
import logging
import os
from importlib.resources import files
from typing import Any, Dict, List, Union
import pandas as pd
from pydantic import ValidationError
from yaml import SafeLoader, load # type: ignore
from otoole.exceptions import OtooleConfigFileError, OtooleDeprecationError
from otoole.preprocess.validate_config import (
UserDefinedParameter,
UserDefinedResult,
UserDefinedSet,
UserDefinedValue,
)
logger = logging.getLogger(__name__)
def _read_file(open_file, ending):
if ending == ".yaml" or ending == ".yml":
contents = load(open_file, Loader=UniqueKeyLoader) # typing: Dict[str, Any]
elif ending == ".json":
contents = json.load(open_file) # typing: Dict
else:
contents = open_file.readlines()
return contents
[docs]def read_packaged_file(filename: str, module_name: str = None):
_, ending = os.path.splitext(filename)
if module_name is None:
with open(filename, "r") as open_file:
contents = _read_file(open_file, ending)
else:
with files(module_name).joinpath(filename).open("r") as open_file:
contents = _read_file(open_file, ending)
return contents
[docs]def create_name_mappings(
config: Dict[str, Dict[str, Union[str, List]]], map_full_to_short: bool = True
) -> Dict:
"""Creates name mapping between full name and short name.
Arguments
---------
config : Dict[str, Dict[str, Union[str, List]]]
Parsed user configuration file
map_full_to_short: bool
Map full name to short name if true, else map short name to full name
Returns
-------
csv_to_excel Dict[str, str]
Mapping of full name to shortened name
"""
csv_to_excel = {}
for name, params in config.items():
try:
csv_to_excel[name] = params["short_name"]
except KeyError:
if len(name) > 31:
logger.info(f"{name} does not have a 'short_name'")
continue
if map_full_to_short:
return csv_to_excel
else:
return {v: k for k, v in csv_to_excel.items()}
[docs]def validate_config(config: Dict) -> None:
"""Validates user input data
Arguments
---------
config: Dict
Read in user config yaml file
Raises
------
ValidationError
If the user inputs are not valid
"""
# For validating with pydantic
config_flattened = format_config_for_validation(config)
user_defined_sets = get_all_sets(config)
errors = []
for input_data in config_flattened:
try:
if "type" not in input_data:
UserDefinedValue(**input_data)
elif input_data["type"] == "param":
input_data["defined_sets"] = user_defined_sets
UserDefinedParameter(**input_data)
elif input_data["type"] == "result":
input_data["defined_sets"] = user_defined_sets
UserDefinedResult(**input_data)
elif input_data["type"] == "set":
UserDefinedSet(**input_data)
else:
# have pydantic raise an error
UserDefinedValue(
name=input_data["name"],
type=input_data["type"],
dtype=input_data["dtype"],
)
except ValidationError as ex:
errors_caught = [x["msg"] for x in ex.errors()]
errors.extend(errors_caught)
if errors:
error_message = "\n".join(errors)
raise OtooleConfigFileError(message=f"\n{error_message}")
[docs]def read_deprecated_datapackage(datapackage: str) -> str:
"""Checks filepath for CSVs if a datapackage is provided
Arguments
---------
datapackage: str
Location of input datapackge
Returns
-------
input_csvs: str
Location of input csv files
Raises
------
OtooleDeprecationError
If no 'data/' directory is found
"""
input_csvs = os.path.join(os.path.dirname(datapackage), "data")
if os.path.exists(input_csvs):
return input_csvs
else:
raise OtooleDeprecationError(
resource="datapackage.json",
message="datapackage format no longer supported and no csv data found",
)
[docs]def get_packaged_resource(
input_data: Dict[str, pd.DataFrame], param: str
) -> List[Dict[str, Any]]:
"""Gets input parameter data and formats it as a dictionary
Arguments
---------
input_data : Dict[str, pd.DataFrame]
Internal datastore for otoole input data
param : str
Name of OSeMOSYS parameter
Returns
-------
List[Dict[str,any]]
List of all rows in the df, where each dictionary is the column
name, followed by the value in that row
Example
-------
>>> get_packaged_resource(input_data, "InputActivityRatio")
>>> [{'REGION': 'SIMPLICITY',
'TECHNOLOGY': 'RIVWATAGR',
'FUEL': 'WATIN',
'MODE_OF_OPERATION': 1,
'YEAR': 2020,
'VALUE': 1.0}]
"""
return input_data[param].reset_index().to_dict(orient="records")
[docs]class UniqueKeyLoader(SafeLoader):
"""YALM Loader to find duplicate keys
This loader will treat lowercase and uppercase keys as the same. Meaning,
the keys "SampleKey" and "SAMPLEKEY" are considered the same.
Raises
------
ValueError
If a key is defined more than once.
Adapted from:
https://stackoverflow.com/a/63215043/14961492
"""
[docs] def construct_mapping(self, node, deep=False):
mapping = []
for key_node, _ in node.value:
key = self.construct_object(key_node, deep=deep)
key = key.upper()
if key in mapping:
raise ValueError(f"{key} -> defined more than once")
mapping.append(key)
return super().construct_mapping(node, deep)
[docs]def get_all_sets(config: Dict) -> List:
"""Extracts user defined sets"""
return [x for x, y in config.items() if y["type"] == "set"]