Source code for LOGS.Converter.Conversion

import os
from enum import Enum
from time import sleep, time
from typing import Any, Callable, Dict, List, Optional, cast

from LOGS.Auxiliary.Exceptions import (
    EntityFetchingException,
    LOGSException,
    UnfinishedConversionException,
)
from LOGS.Auxiliary.Tools import Tools
from LOGS.Converter.ConverterParameter import ParameterType
from LOGS.Entities.ParserLog import ParserLog
from LOGS.Entity import Entity
from LOGS.Entity.ConnectedEntity import ConnectedEntity
from LOGS.Entity.SerializeableContent import SerializeableClass
from LOGS.LOGSConnection import ResponseTypes


[docs] class ConversionState(Enum): Successfull = "Successfull" Failed = "Failed" Waiting = "Waiting"
[docs] class ConversionFile(SerializeableClass): path: Optional[str] = None size: Optional[int] = None
[docs] class DatasetConversionZipReadModel(SerializeableClass): size: Optional[int] = None id: Optional[str] = None url: Optional[str] = None
[docs] class ConverterParameterEntry(SerializeableClass): id: Optional[str] = None value: Optional[Any] = None type: Optional[ParameterType] = None
[docs] class ConversionLogModel(ParserLog): pass
[docs] class Conversion(ConnectedEntity): _datasetId: Optional[int] = None _datasetFormat: Optional[str] = None _exportFormat: Optional[str] = None _files: Optional[List[ConversionFile]] = None _logs: Optional[List[ConversionLogModel]] = None _output: Optional[str] = None _state: Optional[ConversionState] = None _zip: Optional[DatasetConversionZipReadModel] = None _inputParameters: Optional[List[ConverterParameterEntry]] = None _payload: Dict[str, Any] = {} _parentEntity: Optional[Entity] = None
[docs] def download( self, directory: Optional[str] = None, fileName: Optional[str] = None, overwrite=False, ): connection, _ = self._getConnectionData() if self.state == ConversionState.Waiting: raise UnfinishedConversionException(self) if self.state == ConversionState.Failed: raise LOGSException( f"Conversion for dataset {self.datasetId} from format '{self.datasetFormat}' to format '{self.exportFormat}' failed. Check logs for more information." ) if ( self.zip is None or self.zip.size is None or self.zip.id is None or self.zip.url is None ): raise LOGSException( f"Conversion for dataset {self.datasetId} from format '{self.datasetFormat}' to format '{self.exportFormat}' did not result in any data." ) if not directory: directory = os.curdir if not fileName: fileName = f"dataset_{self.datasetId}_{self.zip.id}.zip" path = os.path.join(directory, Tools.sanitizeFileName(fileName=fileName)) if overwrite: if os.path.exists(path) and not os.path.isfile(path): raise LOGSException("Path %a is not a file" % path) else: if os.path.exists(path): raise LOGSException("File %a already exists" % path) data, responseError = connection.getUrl( self.zip.url, responseType=ResponseTypes.RAW ) if responseError: raise LOGSException( f"Could not fetch conversion result for dataset {self.datasetId} from format '{self.datasetFormat}' to format '{self.exportFormat}'.", responseError=responseError, ) with open(path, mode="wb") as localfile: localfile.write(cast(bytes, data)) return path
def _reloadOnWaiting(self, connection, endpoint): if self.state != ConversionState.Waiting: return data, responseError = connection.postEndpoint(endpoint, data=self._payload) if responseError: raise EntityFetchingException( entity=self._parentEntity, responseError=responseError ) self.fromDict(data)
[docs] def awaitResult(self): connection, endpoint = self._getConnectionData() if self.state != ConversionState.Waiting: return while self.state == ConversionState.Waiting: sleep(0.5) # print("request...") self._reloadOnWaiting(connection, endpoint)
# print("... done", self.state)
[docs] @classmethod def awaitAllResults( cls, conversions: List["Conversion"], timeout=600, stateChangeHook: Optional[Callable[[int], None]] = None, ): l = [c for c in conversions if c is not None] connectionList = [] for i, c in enumerate(l): connection, endpoint = c._getConnectionData() connectionList.append((c, connection, endpoint)) count = sum( [1 for c in connectionList if c[0].state != ConversionState.Waiting] ) if stateChangeHook: stateChangeHook(len(connectionList) - count) start = time() while count < len(connectionList) and time() - start < timeout: oldCount = count count = 0 for c in connectionList: if c[0].state != ConversionState.Waiting: count += 1 continue c[0]._reloadOnWaiting(c[1], c[2]) if stateChangeHook and oldCount != count: stateChangeHook(len(connectionList) - count) # print(f"waiting for {len(connectionList) -count} jobs") sleep(0.5)
@property def datasetId(self) -> Optional[int]: return self._datasetId @datasetId.setter def datasetId(self, value): self._datasetId = self.checkAndConvertNullable(value, int, "datasetId") @property def datasetFormat(self) -> Optional[str]: return self._datasetFormat @datasetFormat.setter def datasetFormat(self, value): self._datasetFormat = self.checkAndConvertNullable(value, str, "datasetFormat") @property def exportFormat(self) -> Optional[str]: return self._exportFormat @exportFormat.setter def exportFormat(self, value): self._exportFormat = self.checkAndConvertNullable(value, str, "exportFormat") @property def files(self) -> Optional[List[ConversionFile]]: return self._files @files.setter def files(self, value): self._files = self.checkListAndConvertNullable(value, ConversionFile, "files") @property def logs(self) -> Optional[List[ConversionLogModel]]: return self._logs @logs.setter def logs(self, value): self._logs = self.checkListAndConvertNullable(value, ConversionLogModel, "logs") @property def output(self) -> Optional[str]: return self._output @output.setter def output(self, value): self._output = self.checkAndConvertNullable(value, str, "output") @property def state(self) -> Optional[ConversionState]: return self._state @state.setter def state(self, value): self._state = self.checkAndConvertNullable(value, ConversionState, "state") @property def zip(self) -> Optional[DatasetConversionZipReadModel]: return self._zip @zip.setter def zip(self, value): self._zip = self.checkAndConvertNullable( value, DatasetConversionZipReadModel, "zip" ) @property def inputParameters(self) -> Optional[List[ConverterParameterEntry]]: return self._inputParameters @inputParameters.setter def inputParameters(self, value): self._inputParameters = self.checkListAndConvertNullable( value, ConverterParameterEntry, "inputParameters" )