"""Serialization and deserialization of :class:`.Tesliper` objects."""
import json
import logging
import zipfile
from json.decoder import JSONArray
from json.scanner import py_make_scanner
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union
import tesliper # absolute import to solve problem of circular imports
from tesliper import datawork as dw
from tesliper.glassware import Conformers, SingleSpectrum, Spectra
logger = logging.getLogger(__name__)
[docs]class ArchiveWriter:
"""Class for serialization of Tesliper objects.
Structure of the produced archive::
.
├───arguments: {input_dir=str, output_dir=str, wanted_files=[str]}
├───parameters: {"ir": {params}, ..., "roa": {params}}
├───conformers
│ ├───arguments: {"allow_data_inconsistency": bool,
│ │ "temperature_of_the_system": float}
│ ├───filenames: [str]
│ ├───kept: [bool]
│ └───data
│ ├───filename_1: {genre=str: data}
| ...
│ └───filename_N: {genre=str: data}
└───spectra
├───experimental
│ ├───spectra_genre_1: {attr_name: SingleSpectrum.attr}
| ...
│ └───spectra_genre_N: {attr_name: SingleSpectrum.attr}
├───calculated
│ ├───spectra_genre_1: {attr_name: Spectra.attr}
| ...
│ └───spectra_genre_N: {attr_name: Spectra.attr}
└───averaged
├───spectra_genre_1-energies-genre-1: {attr_name: SingleSpectrum.attr}
...
└───spectra_genre_N-energies-genre-N: {attr_name: SingleSpectrum.attr}
"""
def __init__(
self, destination: Union[str, Path], mode: str = "x", encoding: str = "utf-8"
):
"""
Parameters
----------
destination : Union[str, Path]
Path to target file.
mode : str, optional
Specifies how writing to file should be handled. Should be one of
characters: 'a' (append to existing file), 'x' (only write if file doesn't
exist yet), or 'w' (overwrite file if it already exists). Defaults to "x".
encoding : str, optional
Encoding of the output, by default "utf-8"
"""
self.mode = mode
self.destination = destination
self.encoding = encoding
self.root = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
@property
def mode(self):
"""Specifies how writing to file should be handled. Should be one of characters:
"a", "x", or "w".
"a" - append to existing file;
"x" - only write if file doesn't exist yet;
"w" - overwrite file if it already exists.
Raises
------
ValueError
If given anything other than "a", "x", or "w".
"""
return self._mode
@mode.setter
def mode(self, mode):
if mode not in ("a", "x", "w"):
raise ValueError("Mode should be 'a', 'x', or 'w'.")
self._mode = mode
@property
def destination(self) -> Path:
"""pathlib.Path: Directory, to which generated files should be written.
Raises
------
FileNotFoundError
If given destination doesn't exist or is not a directory.
"""
return vars(self)["destination"]
@destination.setter
def destination(self, destination: Union[str, Path]) -> None:
destination = Path(destination)
if not destination.exists() and self.mode == "a":
raise FileNotFoundError(
"Mode 'a' was specified, but given file doesn't exist."
)
elif destination.exists() and self.mode == "x":
raise FileExistsError(
"Mode 'x' was specified, but given file already exists."
)
elif not destination.parent.exists():
raise FileNotFoundError("Parent directory of specified file doesn't exist.")
else:
logger.debug(f"File {destination} ok for writing.")
vars(self)["destination"] = destination
def open(self):
self.root = zipfile.ZipFile(self.destination, mode=self.mode)
return self
def close(self):
self.root.close()
def write(self, obj: "tesliper.Tesliper"):
with self:
self._write_arguments(
obj.input_dir, obj.output_dir, obj.wanted_files, obj.quantum_software
)
self._write_parameters(obj.parameters)
self._write_conformers(obj.conformers)
for spc in obj.averaged.values():
self._write_averaged(spc)
for spc in obj.spectra.values():
self._write_calculated(spc)
for spc in obj.experimental.values():
self._write_experimental(spc)
def _write_arguments(
self,
input_dir: Union[Path, str] = None,
output_dir: Union[Path, str] = None,
wanted_files: Iterable[str] = None,
quantum_software: str = None,
):
with self.root.open("arguments.json", mode="w") as handle:
handle.write(
self.jsonencode(
{
"input_dir": str(input_dir) if input_dir else None,
"output_dir": str(output_dir) if output_dir else None,
"wanted_files": list(wanted_files) if wanted_files else None,
"quantum_software": quantum_software or None,
}
)
)
def _write_parameters(self, parameters: dict):
# TODO: Implement more universal way of serializing fitting
# this won't deserialize custom fitting functions
to_write = {key: params.copy() for key, params in parameters.items()}
for params in to_write.values():
params["fitting"] = params["fitting"].__name__
with self.root.open("parameters.json", mode="w") as handle:
handle.write(self.jsonencode(to_write))
def _write_conformers(self, conformers: Conformers):
self._write_conformers_arguments(
allow_data_inconsistency=conformers.allow_data_inconsistency,
temperature_of_the_system=conformers.temperature,
)
self._write_filenames(conformers.filenames)
self._write_kept(conformers.kept)
for filename in conformers.filenames:
self._write_mol(filename=filename, mol=conformers[filename])
def _write_conformers_arguments(self, **kwargs):
with self.root.open("conformers/arguments.json", mode="w") as handle:
handle.write(self.jsonencode(kwargs))
def _write_filenames(self, filenames: List[str]):
with self.root.open("conformers/filenames.json", mode="w") as handle:
handle.write(self.jsonencode(filenames))
def _write_mol(self, filename: str, mol: dict):
with self.root.open(f"conformers/data/{filename}.json", mode="w") as handle:
handle.write(self.jsonencode(mol))
def _write_kept(self, kept: List[bool]):
with self.root.open("conformers/kept.json", mode="w") as handle:
handle.write(self.jsonencode(kept))
def _write_experimental(self, spectrum: SingleSpectrum):
path = f"spectra/experimental/{spectrum.genre}.json"
with self.root.open(path, mode="w") as handle:
handle.write(
self.jsonencode(
{
"genre": spectrum.genre,
"filenames": spectrum.filenames.tolist(),
"values": spectrum.values.tolist(),
"abscissa": spectrum.abscissa.tolist(),
"width": spectrum.width,
"fitting": spectrum.fitting,
"scaling": spectrum.scaling,
"offset": spectrum.offset,
"averaged_by": spectrum.averaged_by,
}
)
)
def _write_calculated(self, spectra: Spectra):
path = f"spectra/calculated/{spectra.genre}.json"
with self.root.open(path, mode="w") as handle:
handle.write(
self.jsonencode(
{
"genre": spectra.genre,
"filenames": spectra.filenames.tolist(),
"values": spectra.values.tolist(),
"abscissa": spectra.abscissa.tolist(),
"width": spectra.width,
"fitting": spectra.fitting,
"scaling": spectra.scaling,
"offset": spectra.offset,
"allow_data_inconsistency": spectra.allow_data_inconsistency,
}
)
)
def _write_averaged(self, spectrum: SingleSpectrum):
path = f"spectra/averaged/{spectrum.genre}-{spectrum.averaged_by}.json"
with self.root.open(path, mode="w") as handle:
handle.write(
self.jsonencode(
{
"genre": spectrum.genre,
"filenames": spectrum.filenames.tolist(),
"values": spectrum.values.tolist(),
"abscissa": spectrum.abscissa.tolist(),
"width": spectrum.width,
"fitting": spectrum.fitting,
"scaling": spectrum.scaling,
"offset": spectrum.offset,
"averaged_by": spectrum.averaged_by,
}
)
)
[docs] def jsonencode(
self,
obj: Any,
*,
skipkeys=False,
ensure_ascii=True,
check_circular=True,
allow_nan=True,
cls=None,
indent=None,
separators=None,
default=None,
sort_keys=False,
**kw,
) -> bytes:
"""json.dumps wrapper, that encodes JSON produced."""
return json.dumps(
obj,
skipkeys=skipkeys,
ensure_ascii=ensure_ascii,
check_circular=check_circular,
allow_nan=allow_nan,
cls=cls,
indent=indent,
separators=separators,
default=default,
sort_keys=sort_keys,
**kw,
).encode(self.encoding)
[docs]class ArchiveLoader:
"""Class for deserialization of Tesliper objects."""
def __init__(self, source: Union[str, Path], encoding: str = "utf-8"):
"""
Parameters
----------
source : Union[str, Path]
Path to the source file.
encoding : str, optional
Source file encoding, by default "utf-8".
"""
self.source = source
self.encoding = encoding
self.root = None
def __enter__(self):
return self.open()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def open(self):
self.root = zipfile.ZipFile(self.source, mode="r")
return self
def close(self):
self.root.close()
@property
def source(self) -> Path:
"""pathlib.Path: File, from which data should read.
Notes
-----
If str given, it will be converted to pathlib.Path.
Raises
------
FileNotFoundError
If given destination doesn't exist.
"""
return self._destination
@source.setter
def source(self, destination: Union[str, Path]) -> None:
destination = Path(destination)
if not destination.exists():
raise FileNotFoundError("Given destination doesn't exist.")
self._destination = destination
def load(self) -> "tesliper.Tesliper":
with self:
tslr = tesliper.Tesliper(**self._load("arguments.json"))
tslr.parameters = self._load_parameters()
filenames = self._load("conformers/filenames.json")
mols = (
(
name,
self.jsondecode(self.root.read(f"conformers/data/{name}.json")),
)
for name in filenames
) # iterator producing key-value pairs
tslr.conformers = Conformers(
mols, **self._load("conformers/arguments.json")
)
tslr.conformers.kept = self._load("conformers/kept.json")
for file in self.root.namelist():
if "experimental" in file:
params = self._load(file)
tslr.experimental[params["genre"]] = SingleSpectrum(**params)
elif "calculated" in file:
params = self._load(file)
tslr.spectra[params["genre"]] = Spectra(**params)
elif "averaged" in file:
params = self._load(file)
tslr.averaged[
(params["genre"], params["averaged_by"])
] = SingleSpectrum(**params)
return tslr
def _load(self, dest):
return self.jsondecode(self.root.read(dest))
def _load_parameters(self):
parameters = self._load("parameters.json")
for params in parameters.values():
params["fitting"] = getattr(dw, params["fitting"])
return parameters
[docs] def jsondecode(
self,
string: bytes,
*,
cls=None,
object_hook=None,
parse_float=None,
parse_int=None,
parse_constant=None,
object_pairs_hook=None,
**kw,
) -> Any:
"""json.loads wrapper, that decodes bytes before parsing as JSON."""
return json.loads(
string.decode(self.encoding),
cls=cls,
object_hook=object_hook,
parse_float=parse_float,
parse_int=parse_int,
parse_constant=parse_constant,
object_pairs_hook=object_pairs_hook,
**kw,
)