Source code for LOGS.Entities.DatasetBase

import os
from typing import Any, Dict, List, Optional, cast

from LOGS.Auxiliary.Exceptions import (
    EntityFetchingException,
    EntityIncompleteException,
    LOGSException,
)
from LOGS.Auxiliary.Tools import Tools
from LOGS.Entities.DataFormatMinimal import DataFormatMinimal
from LOGS.Entities.DatasetInfo import DatasetInfo
from LOGS.Entities.DatasetRequestParameter import ParsingStates
from LOGS.Entities.Datatrack import Datatrack
from LOGS.Entities.FileEntry import FileEntry
from LOGS.Entities.HierarchyNode import HierarchyNode
from LOGS.Entities.ParserLog import ParserLog
from LOGS.Entities.Track import Track
from LOGS.Entity.EntityWithIntId import IEntityWithIntId
from LOGS.Interfaces.INamedEntity import INamedEntity
from LOGS.LOGSConnection import ResponseTypes


[docs] class DatasetBase( IEntityWithIntId, INamedEntity, ): # private attributes _files: Optional[List[FileEntry]] = None # state of additionally fetched data _noInfo = True # fields _path: Optional[str] = None # special fetched fields ## Getter and setter implemented in inherited classes _formatVersion: Optional[int] = None _tracks: Optional[List[Track]] = None _datatracks: Optional[List[Datatrack]] = None _tracksHierarchy: Optional[HierarchyNode] = None ## Getter and setter implemented in this class _parsingState: Optional[ParsingStates] = None _parserLogs: Optional[List[ParserLog]] = None _zipSize: Optional[int] = None _format: Optional["DataFormatMinimal"] = None
[docs] def fetchZipSize(self): connection, endpoint, id = self._getConnectionData() zip, responseError = connection.getEndpoint( endpoint + ["zip_size"], parameters={"ids": [self.id]} ) if responseError: raise EntityFetchingException(entity=self, responseError=responseError) if isinstance(zip, dict) and "size" in zip: self._zipSize = zip["size"]
def _getDataDir(self): if self.cacheDir: if not os.path.isdir(self.cacheDir): raise LOGSException( f"Specified cache directory '{self.cacheDir}' cannot be opened or is not a directory." ) return self.cacheDir return None
[docs] def fetchInfo(self): connection, endpoint, id = self._getConnectionData() data, responseError = connection.getEndpoint(endpoint + [id, "info"]) if responseError: raise EntityFetchingException(entity=self, responseError=responseError) dataDir = self._getDataDir() if dataDir and not os.path.exists(dataDir): os.mkdir(dataDir) self._setInfo(cast(dict, data)) self._noInfo = False if self._datatracks: for datatrack in self._datatracks: datatrack._endpoint = ( endpoint + [str(id), "datatrack"] if endpoint else None )
def _setInfo(self, data: dict): info = DatasetInfo(data) self._formatVersion = info.formatVersion self._parserLogs = info.parserLogs self._tracks = info.tracks self._datatracks = info.datatracks self._tracksHierarchy = info.tracksHierarchy self._parsingState = info.parsingState dataDir = self._getDataDir() trackLookup: Dict[str, Datatrack] = {} if self._datatracks: for datatrack in self._datatracks: datatrack._setConnection(self._getConnection()) datatrack.cacheDir = dataDir if datatrack.id: trackLookup[datatrack.id] = datatrack if self._tracks: for track in self._tracks: track._setConnection(self._getConnection()) track.cacheDir = dataDir if track._dataIds: track.datatracks = cast( Any, { k: (trackLookup[v] if v in trackLookup else None) for k, v in track._dataIds.items() }, )
[docs] def download( self, directory: Optional[str] = None, fileName: Optional[str] = None, overwrite=False, ): connection, endpoint, id = self._getConnectionData() if not directory: directory = os.curdir if not fileName: fileName = self.name if self.name and self.name != "" else "Dataset" fileName += ".zip" path = os.path.join(directory, Tools.sanitizeFileName(fileName=fileName)) if overwrite: if os.path.exists(path) and not os.path.isfile(path): raise LOGSException("Path %a is not a file" % path) else: if os.path.exists(path): raise LOGSException("File %a already exists" % path) data, responseError = connection.getEndpoint( endpoint + [id, "files", "zip"], responseType=ResponseTypes.RAW ) if responseError: raise EntityFetchingException(entity=self, responseError=responseError) with open(path, mode="wb") as localFile: localFile.write(cast(bytes, data)) return path
@property def format(self) -> Optional["DataFormatMinimal"]: return self._format @property def path(self) -> Optional[str]: return self._path @path.setter def path(self, value): self._path = self.checkAndConvertNullable(value, str, "path") @property def parserLogs(self) -> Optional[List[ParserLog]]: if self._noInfo: raise EntityIncompleteException( self, parameterName="parserLogs", functionName=f"{self.fetchInfo.__name__}()", ) return self._parserLogs @property def parsingState(self) -> Optional[ParsingStates]: return self._parsingState @parsingState.setter def parsingState(self, value): self._parsingState = cast( ParsingStates, self.checkAndConvertNullable(value, str, "parsingState") ) @property def zipSize(self) -> Optional[int]: if self._zipSize is None: raise EntityIncompleteException( self, parameterName="zipSize", functionName=f"{self.fetchZipSize.__name__}()", ) return self._zipSize