import logging
import os
from typing import Any, TextIO
import pandas as pd
from otoole.exceptions import OtooleExcelNameLengthError
from otoole.input import WriteStrategy
logger = logging.getLogger(__name__)
[docs]class WriteExcel(WriteStrategy):
def _header(self):
return pd.ExcelWriter(self.filepath, mode="w")
def _form_parameter(
self, df: pd.DataFrame, parameter_name: str, default: float
) -> pd.DataFrame:
"""Converts data into wide format
Arguments
---------
df: pd.DataFrame
parameter_name: str
default: float
Returns
-------
pandas.DataFrame
"""
index_names = df.index.names
column_names = df.columns.to_list()
if index_names[0]:
names = index_names + column_names
else:
names = column_names
logger.debug(f"Identified {len(names)} names: {names}")
total_columns = len(names)
if "YEAR" not in names:
pivot = df.copy()
elif total_columns > 3:
logger.debug("More than 3 columns for {}: {}".format(parameter_name, names))
rows = names[0:-2]
columns = names[-2]
values = names[-1]
logger.debug(f"Rows: {rows}; columns: {columns}; values: {values}")
logger.debug("dtypes: {}".format(df.dtypes))
pivot = df.reset_index().pivot(index=rows, columns=columns, values=values)
else:
logger.debug(f"One column for {parameter_name}: {names}")
pivot = df.copy()
return pivot
def _form_parameter_template(self, parameter_name: str, **kwargs) -> pd.DataFrame:
"""Creates wide format excel template
Pivots the data to wide format using the data from the YEAR set as the columns.
This requires input data to be passed into this function.
Arguments
---------
parameter_name: str
input_data: dict[str, pd.DataFrame])
Returns
-------
pd.DataFrame
"""
indices = self.user_config[parameter_name]["indices"]
if "input_data" not in kwargs:
logger.debug(f"Can not pivot excel template for {parameter_name}")
return pd.DataFrame(columns=indices)
else:
input_data = kwargs["input_data"]
if "YEAR" in indices:
years = input_data["YEAR"]["VALUE"].to_list()
indices.remove("YEAR")
indices.extend(years)
else:
indices.extend(["VALUE"])
return pd.DataFrame(columns=indices)
def _write_parameter(
self,
df: pd.DataFrame,
parameter_name: str,
handle: pd.ExcelWriter,
default: float,
**kwargs,
):
try:
name = self.user_config[parameter_name]["short_name"]
except KeyError:
name = parameter_name
if len(name) > 31:
raise OtooleExcelNameLengthError(name=name)
if not df.empty:
df = self._form_parameter(df, parameter_name, default)
df.to_excel(handle, sheet_name=name, merge_cells=False, index=True)
else:
logger.debug(f"Dataframe {parameter_name} is empty")
df = self._form_parameter_template(parameter_name, **kwargs)
df.to_excel(handle, sheet_name=name, merge_cells=False, index=False)
def _write_set(self, df: pd.DataFrame, set_name, handle: pd.ExcelWriter):
df.to_excel(handle, sheet_name=set_name, merge_cells=False, index=False)
def _footer(self, handle=pd.ExcelWriter):
handle.close()
[docs]class WriteDatafile(WriteStrategy):
def _header(self):
filepath = open(self.filepath, "w", newline="")
msg = "# Model file written by *otoole*\n"
filepath.write(msg)
return filepath
def _form_parameter(self, df: pd.DataFrame, default: float):
# Don't write out values equal to the default value
df = df[df.VALUE != default]
return df
def _write_parameter(
self,
df: pd.DataFrame,
parameter_name: str,
handle: TextIO,
default: float,
**kwargs,
):
"""Write parameter data to a GMPL datafile, omitting data with default value
Arguments
---------
filepath : StreamIO
df : pandas.DataFrame
parameter_name : str
handle: TextIO
default : int
"""
if not self.write_defaults:
df = self._form_parameter(df, default)
handle.write("param default {} : {} :=\n".format(default, parameter_name))
df.to_csv(
path_or_buf=handle,
sep=" ",
header=False,
index=True,
float_format="%g",
lineterminator="\n",
)
handle.write(";\n")
def _write_set(self, df: pd.DataFrame, set_name, handle: TextIO):
"""Write set data to a GMPL datafile
Arguments
---------
df : pandas.DataFrame
set_name : str
handle: TextIO
"""
handle.write("set {} :=\n".format(set_name))
df.to_csv(
path_or_buf=handle,
sep=" ",
header=False,
index=False,
float_format="%g",
lineterminator="\n",
)
handle.write(";\n")
def _footer(self, handle: TextIO):
handle.write("end;\n")
handle.close()
[docs]class WriteCsv(WriteStrategy):
"""Write parameters to comma-separated value files
Arguments
---------
filepath: str, default=None
The path to write a folder of csv files
default_values: dict, default=None
user_config: dict, default=None
"""
@staticmethod
def _write_out_dataframe(folder, parameter, df, index=False):
"""Writes out a dataframe as a csv into a data subfolder
Arguments
---------
folder : str
parameter : str
df : pandas.DataFrame
index : bool, default=False
Write the index to CSV
"""
filepath = os.path.join(folder, parameter + ".csv")
with open(filepath, "w", newline="") as csvfile:
logger.info(
"Writing %s rows into narrow file for %s", df.shape[0], parameter
)
df.to_csv(csvfile, index=index)
def _header(self) -> Any:
os.makedirs(os.path.join(self.filepath), exist_ok=True)
return None
def _write_parameter(
self,
df: pd.DataFrame,
parameter_name: str,
handle: TextIO,
default: float,
**kwargs,
) -> pd.DataFrame:
"""Write parameter data"""
self._write_out_dataframe(self.filepath, parameter_name, df, index=True)
def _write_set(self, df: pd.DataFrame, set_name, handle: TextIO) -> pd.DataFrame:
"""Write set data"""
self._write_out_dataframe(self.filepath, set_name, df, index=False)
def _footer(self, handle: TextIO):
pass