import dataclasses
from typing import Any, Generic, List, Optional, Type, TypeVar, cast
from LOGS.Auxiliary.Exceptions import LOGSException
from LOGS.Auxiliary.Tools import Tools
from LOGS.Entity.Entity import Entity
from LOGS.Entity.EntityConnector import EntityConnector
from LOGS.Entity.EntityRequestParameter import EntityRequestParameter
from LOGS.LOGSConnection import RESPONSE_TYPES, LOGSConnection, ResponseTypes
# SELF = TypeVar("SELF", bound="EntityConnector")
ENTITY = TypeVar("ENTITY", bound=Entity)
REQUEST = TypeVar("REQUEST", bound=EntityRequestParameter)
[docs]
class EntityIterator(Generic[ENTITY, REQUEST], EntityConnector[ENTITY]):
"""Represents a connected LOGS entity iterator"""
_firstUrl: Optional[str] = None
_entityIterator: int
_currentResults: Optional[RESPONSE_TYPES]
_generatorType: Optional[Type[ENTITY]] = None
_parameterType: Optional[Type[REQUEST]] = None
_parameters: REQUEST
_responseType: ResponseTypes = ResponseTypes.JSON
_includeUrl: bool = True
_connection: Optional[LOGSConnection]
_count: Optional[int]
def __init__(
self, connection: Optional[LOGSConnection], parameters: Optional[REQUEST] = None
):
super().__init__(connection=connection)
self._connection = connection
if not self._parameterType:
raise NotImplementedError(
"Entity connection cannot be initialized without the request 'parameterType' field in class %a"
% type(self).__name__
)
if not isinstance(self._parameterType, type):
raise NotImplementedError(
"The field 'parameterType' must be a 'type' got %a in class %a"
% (type(self._parameterType), type(self).__name__)
)
if parameters and not isinstance(parameters, self._parameterType):
raise LOGSException(
"Parameter for iterator %a must be of type %a. (Got %a)"
% (
type(self).__name__,
self._parameterType.__name__,
type(parameters).__name__,
)
)
self._parameters = Tools.checkAndConvert(
parameters, self._parameterType, "parameters", initOnNone=True
)
self._entityIterator = 0
self._currentResults = None
self._count = None
def __iter__(self):
self._initEntityIterator()
return self
def __next__(self) -> ENTITY:
if not self._generatorType:
raise NotImplementedError(
"Iterator cannot generate items without a specified 'generatorType' field in class %a"
% type(self).__name__
)
return cast(Any, self._generatorType)(
self._getNextEntity(), connection=self._connection
)
def _getNextPage(self, result):
if not self._connection:
raise LOGSException("Connection of %a is not defined" % type(self).__name__)
if "hasNext" not in result or not result["hasNext"]:
return None, None
url = result["url"]
page = 1
if "page" in result:
page = int(result["page"])
elif self._parameters.page:
page = self._parameters.page
self._parameters.page = page + 1
return self._connection.postUrl(
url=url,
data=self._parameters.toDict(),
responseType=self._responseType,
includeUrl=self._includeUrl,
)
# ### code for using get ###
# o = urlparse(result["url"])
# params = parse_qs(o.query)
# params = {k: v[0] if isinstance(v, list) else v for k, v in params.items()}
# page = 1
# if "page" in params:
# page = int(params["page"])
# params["page"] = page + 1
# urlList = list(o)
# urlList[4] = None
# url = cast(str, urlunparse(urlList))
# return self._connection.getUrl(
# url,
# parameters=params,
# responseType=self._responseType,
# includeUrl=self._includeUrl,
# )
def _initEntityIterator(self):
url = self.getBaseUrl()
self._entityIterator = 0
if not self._connection:
raise LOGSException(
"Entity connector %a is not connected" % type(self).__name__
)
# ### code for using get ###
# self._currentResults, error = self._connection.getUrl(
# url=url,
# parameters=self._parameters.toDict(),
# responseType=self._responseType,
# includeUrl=self._includeUrl,
# )
tmp = False
if hasattr(self._parameters, "includeCount"):
tmp = self._parameters.includeCount
self._parameters.includeCount = True
self._currentResults, responseError = self._connection.postUrl(
url=url + "/list",
data=self._parameters.toDict(),
responseType=self._responseType,
includeUrl=self._includeUrl,
)
if hasattr(self._parameters, "includeCount"):
self._parameters.includeCount = tmp
if isinstance(self._currentResults, dict) and "count" in self._currentResults:
self._count = int(self._currentResults["count"])
if responseError:
raise LOGSException(responseError=responseError)
def _getNextEntity(self):
if not isinstance(self._currentResults, dict):
raise StopIteration
results = self._currentResults["results"]
if self._entityIterator < len(results):
result = results[self._entityIterator]
self._entityIterator += 1
return result
self._currentResults, error = self._getNextPage(self._currentResults)
if error:
raise Exception("Connection error: %a", error)
self._entityIterator = 0
if (
not self._currentResults
or not isinstance(self._currentResults, dict)
or len(self._currentResults["results"]) < 1
):
raise StopIteration
return self._getNextEntity()
[docs]
def split(self, size=20):
connection, _ = self._getConnectionData()
items = iter(self)
ids = []
for dataset in items:
ids.append(dataset.id)
if len(ids) >= size:
param = dataclasses.replace(self._parameters)
param.ids = ids
iterator = type(self)(connection=connection)
iterator._parameters = param
yield iterator
ids = []
if ids:
param = dataclasses.replace(self._parameters)
param.ids = ids
iterator = type(self)(connection=connection)
iterator._parameters = param
yield iterator
[docs]
def first(self):
i = iter(self)
try:
return cast(ENTITY, next(i))
except StopIteration:
return None
[docs]
def toList(self, count: Optional[int] = None):
if count:
count = int(count)
if count < 0:
raise Exception("Invalid negative count %d" % count)
result = cast(List[ENTITY], [])
num = 0
for entity in self:
result.append(entity)
num += 1
if num >= count:
break
return result
return list(self)
@property
def count(self) -> int:
if self._count is None:
self._initEntityIterator()
return self._count if self._count else 0