Source code for otoole.results.results

import logging
from abc import abstractmethod
from io import StringIO
from typing import Any, Dict, List, Set, TextIO, Tuple, Union

import pandas as pd

from otoole.input import ReadStrategy
from otoole.results.result_package import ResultsPackage

LOGGER = logging.getLogger(__name__)


[docs]class ReadResults(ReadStrategy):
[docs] def read( self, filepath: Union[str, TextIO], **kwargs ) -> Tuple[Dict[str, pd.DataFrame], Dict[str, Any]]: """Read a solution file from ``filepath`` and process using ``input_data`` Arguments --------- filepath : str, TextIO A path name or file buffer pointing to the solution file input_data : dict, default=None dict of dataframes Returns ------- tuple A tuple containing dict of pandas.DataFrames and a dict of default_values """ if "input_data" in kwargs: input_data = kwargs["input_data"] else: input_data = None available_results = self.get_results_from_file( filepath, input_data ) # type: Dict[str, pd.DataFrame] default_values = self._read_default_values(self.results_config) # type: Dict results = self.calculate_results( available_results, input_data ) # type: Dict[str, pd.DataFrame] return results, default_values
[docs] @abstractmethod def get_results_from_file(self, filepath, input_data): raise NotImplementedError()
[docs] def calculate_results( self, available_results: Dict[str, pd.DataFrame], input_data: Dict[str, pd.DataFrame], ) -> Dict[str, pd.DataFrame]: """Populates the results with calculated values using input data""" results = {} results_package = ResultsPackage(available_results, input_data) for name in sorted(self.results_config.keys()): LOGGER.info("Looking for %s", name) try: results[name] = results_package[name] except KeyError as ex: LOGGER.info("No calculation method available for %s", name) LOGGER.debug("Error calculating %s: %s", name, str(ex)) return results
[docs]class ReadWideResults(ReadResults):
[docs] def get_results_from_file(self, filepath, input_data): cbc = self._convert_to_dataframe(filepath) available_results = self._convert_wide_to_long(cbc) return available_results
@abstractmethod def _convert_to_dataframe(self, file_path: Union[str, TextIO]) -> pd.DataFrame: raise NotImplementedError() def _convert_wide_to_long(self, data: pd.DataFrame) -> Dict[str, pd.DataFrame]: """Convert from wide to long format Converts a pandas DataFrame containing all wide format results to reformatted dictionary of pandas DataFrames in long format ready to write out Arguments --------- data : pandas.DataFrame results stored in a dataframe Example ------- >>> df = pd.DataFrame(data=[ ['TotalDiscountedCost', "SIMPLICITY,2015", 187.01576], ['TotalDiscountedCost', "SIMPLICITY,2016", 183.30788]], columns=['Variable', 'Index', 'Value']) >>> convert_dataframe_to_csv(df) {'TotalDiscountedCost': REGION YEAR VALUE 0 SIMPLICITY 2015 187.01576 1 SIMPLICITY 2016 183.30788} """ sets = {x: y for x, y in self.user_config.items() if y["type"] == "set"} results = {} # type: Dict[str, pd.DataFrame] not_found = [] for name, details in sorted(self.results_config.items()): df_cbc = data[data["Variable"] == name] if not df_cbc.empty: df = df_cbc.copy() # setting with copy warning LOGGER.debug("Extracting results for %s", name) indices = details["indices"] # typing: List df[indices] = df["Index"].str.split(",", expand=True) types = {index: sets[index]["dtype"] for index in indices} df = df.astype(types) df = df.drop(columns=["Variable", "Index"]) df = df.rename(columns={"Value": "VALUE"}) columns = indices + ["VALUE"] df = df[columns] index = details["indices"].copy() df, index = check_duplicate_index(df, columns, index) results[name] = df.set_index(index) else: not_found.append(name) LOGGER.debug("Unable to find result variables for: %s", ", ".join(not_found)) return results
[docs]def check_duplicate_index(df: pd.DataFrame, columns: List, index: List) -> pd.DataFrame: """Catches pandas error when there are duplicate column indices""" if check_for_duplicates(index): index = rename_duplicate_column(index) LOGGER.debug("Original column names: %s", columns) renamed_columns = rename_duplicate_column(columns) LOGGER.debug("New column names: %s", renamed_columns) df.columns = renamed_columns return df, index
[docs]def check_for_duplicates(index: List) -> bool: return len(set(index)) != len(index)
[docs]def identify_duplicate(index: List) -> Union[int, bool]: elements = set() # type: Set for counter, elem in enumerate(index): if elem in elements: return counter else: elements.add(elem) return False
[docs]def rename_duplicate_column(index: List) -> List: column = index.copy() location = identify_duplicate(column) if location: column[location] = "_" + column[location] return column
[docs]class ReadCplex(ReadWideResults): """Read a CPLEX solution file into memeory""" def _convert_to_dataframe(self, file_path: Union[str, TextIO]) -> pd.DataFrame: """Reads a Cplex solution file into a pandas DataFrame Arguments --------- user_config : Dict[str, Dict] file_path : Union[str, TextIO] """ df = pd.read_xml(file_path, xpath=".//variable", parser="etree") df[["Variable", "Index"]] = df["name"].str.split("(", expand=True) df["Index"] = df["Index"].str.replace(")", "", regex=False) LOGGER.debug(df) df = df[(df["value"] != 0)].reset_index().rename(columns={"value": "Value"}) return df[["Variable", "Index", "Value"]].astype({"Value": float})
[docs]class ReadGurobi(ReadWideResults): """Read a Gurobi solution file into memory""" def _convert_to_dataframe(self, file_path: Union[str, TextIO]) -> pd.DataFrame: """Reads a Gurobi solution file into a pandas DataFrame Arguments --------- user_config : Dict[str, Dict] file_path : Union[str, TextIO] """ df = pd.read_csv( file_path, header=None, sep=" ", names=["Variable", "Value"], skiprows=2, ) # type: pd.DataFrame df[["Variable", "Index"]] = df["Variable"].str.split("(", expand=True) df["Index"] = df["Index"].str.replace(")", "", regex=False) LOGGER.debug(df) df = df[(df["Value"] != 0)].reset_index() return df[["Variable", "Index", "Value"]].astype({"Value": float})
[docs]class ReadCbc(ReadWideResults): """Read a CBC solution file into memory Arguments --------- user_config : Dict[str, Dict] results_config : Dict[str, Dict] """ def _convert_to_dataframe(self, file_path: Union[str, TextIO]) -> pd.DataFrame: """Reads a CBC solution file into a pandas DataFrame Arguments --------- file_path : str """ df = pd.read_csv( file_path, header=None, sep="(", names=["Variable", "indexvalue"], skiprows=1, ) # type: pd.DataFrame if df["Variable"].astype(str).str.contains(r"^\*\*").any(): LOGGER.warning( "CBC Solution File contains decision variables out of bounds. " + "You have an infeasible solution" ) df["Variable"] = ( df["Variable"] .astype(str) .str.replace(r"^\*\*", "", regex=True) .str.split(expand=True)[1] ) df[["Index", "Value"]] = df["indexvalue"].str.split(expand=True).loc[:, 0:1] df["Index"] = df["Index"].str.replace(")", "", regex=False) df = df.drop(columns=["indexvalue"]) return df[["Variable", "Index", "Value"]].astype({"Value": float})
[docs]class ReadGlpk(ReadWideResults): """Reads a GLPK Solution file into memory Arguments --------- user_config : Dict[str, Dict] glpk_model: Union[str, TextIO] Path to GLPK model file. Can be created using the `--wglp` flag. """ def __init__(self, user_config: Dict[str, Dict], glpk_model: Union[str, TextIO]): super().__init__(user_config) if isinstance(glpk_model, str): with open(glpk_model, "r") as model_file: self.model = self.read_model(model_file) elif isinstance(glpk_model, StringIO): self.model = self.read_model(glpk_model) else: raise TypeError("Argument filepath type must be a string or an open file") def _convert_to_dataframe(self, glpk_sol: Union[str, TextIO]) -> pd.DataFrame: """Creates a wide formatted dataframe from GLPK solution Arguments --------- glpk_sol: Union[str, TextIO] Path to GLPK solution file. Can be created using the `--write` flag Returns ------- pd.DataFrame """ if isinstance(glpk_sol, str): with open(glpk_sol, "r"): _, sol = self.read_solution(glpk_sol) elif isinstance(glpk_sol, StringIO): _, sol = self.read_solution(glpk_sol) else: raise TypeError("Argument filepath type must be a string or an open file") return self._merge_model_sol(sol)
[docs] def read_model(self, file_path: Union[str, TextIO]) -> pd.DataFrame: """Reads in a GLPK Model File Arguments --------- file_path: Union[str, TextIO] Path to GLPK model file. Can be created using the `--wglp` flag. Returns ------- pd.DataFrame ID NUM NAME INDEX 0 i 1 CAa4_Constraint_Capacity "SIMPLICITY,ID,BACKSTOP1,2015" 1 j 2 NewCapacity "SIMPLICITY,WINDPOWER,2039" Notes ----- -> GENERAL LAYOUT OF SOLUTION FILE n p NAME # p = problem instance n z NAME # z = objective function n i ROW NAME # i = constraint name, ROW is the row ordinal number n j COL NAME # j = variable name, COL is the column ordinal number """ df = pd.read_csv( file_path, header=None, sep=r"\s+", index_col=0, names=["ID", "NUM", "value", 4, 5], ).drop(columns=[4, 5]) df = df[(df["ID"].isin(["i", "j"])) & (df["value"] != "cost")] df[["NAME", "INDEX"]] = df["value"].str.split("[", expand=True) df["INDEX"] = df["INDEX"].map(lambda x: x.split("]")[0]) df = ( df[["ID", "NUM", "NAME", "INDEX"]] .astype({"ID": str, "NUM": "int64", "NAME": str, "INDEX": str}) .reset_index(drop=True) ) return df
[docs] def read_solution( self, file_path: Union[str, TextIO] ) -> Tuple[Dict[str, Union[str, float]], pd.DataFrame]: """Reads a GLPK solution file Arguments --------- file_path: Union[str, TextIO] Path to GLPK solution file. Can be created using the `--write` flag Returns ------- Tuple[Dict[str,Union[str, float]], pd.DataFrame] Dict[str,Union[str, float]] -> Problem name, status, and objective value pd.DataFrame -> Variables and constraints {"name":"osemosys", "status":"OPTIMAL", "objective":4497.31976} ID NUM STATUS PRIM DUAL 0 i 1 b 5 0 1 j 2 l 0 2 Notes ----- -> ROWS IN SOLUTION FILE i ROW ST PRIM DUAL ROW is the ordinal number of the row ST is one of: - b = inactive constraint; - l = inequality constraint active on its lower bound; - u = inequality constraint active on its upper bound; - f = active free (unounded) row; - s = active equality constraint. PRIM specifies the row primal value (float) DUAL specifies the row dual value (float) -> COLUMNS IN SOLUTION FILE j COL ST PRIM DUAL COL specifies the column ordinal number ST contains one of the following lower-case letters that specifies the column status in the basic solution: - b = basic variable - l = non-basic variable having its lower bound active - u = non-basic variable having its upper bound active - f = non-basic free (unbounded) variable - s = non-basic fixed variable. PRIM field contains column primal value (float) DUAL field contains the column dual value (float) """ df = pd.read_csv(file_path, header=None, sep=":") # get status information status = {} df_status = df.loc[:8].set_index(0) status["name"] = df_status.loc["c Problem", 1].strip() status["status"] = df_status.loc["c Status", 1].strip() status["objective"] = float(df_status.loc["c Objective", 1].split()[2]) # get solution infromation data = df.iloc[8:-1].copy() data[["ID", "NUM", "STATUS", "PRIM", "DUAL"]] = data[0].str.split( " ", expand=True ) data = ( data[["ID", "NUM", "STATUS", "PRIM", "DUAL"]] .astype( {"ID": str, "NUM": "int64", "STATUS": str, "PRIM": float, "DUAL": float} ) .reset_index(drop=True) ) return status, data
def _merge_model_sol(self, sol: pd.DataFrame) -> pd.DataFrame: """Merges GLPK model and solution file into one dataframe Arguments --------- sol: pd.DataFrame see output from ReadGlpk.read_solution(...) Returns ------- pd.DataFrame >>> pd.DataFrame(data=[ ['TotalDiscountedCost', "SIMPLICITY,2015", 187.01576], ['TotalDiscountedCost', "SIMPLICITY,2016", 183.30788]], columns=['Variable', 'Index', 'Value']) """ model = self.model.copy() model.index = model["ID"].str.cat(model["NUM"].astype(str)) model = model.drop(columns=["ID", "NUM"]) sol.index = sol["ID"].str.cat(sol["NUM"].astype(str)) sol = sol.drop(columns=["ID", "NUM", "STATUS", "DUAL"]) df = model.join(sol) df = ( df[df.index.str.startswith("j")] .reset_index(drop=True) .rename(columns={"NAME": "Variable", "INDEX": "Index", "PRIM": "Value"}) ) return df