Source code for LOGS.Entities.DatasetMatching

from dataclasses import dataclass, field
from typing import Dict, List, Literal, Optional, Sequence, Union

from LOGS.Auxiliary.Constants import Constants
from LOGS.Auxiliary.Decorators import Endpoint
from LOGS.Auxiliary.Exceptions import LOGSException
from LOGS.Auxiliary.Tools import Tools
from LOGS.Entities.DatasetMatchTypes import MatchRequest, MatchResult
from LOGS.Entities.FileEntry import FileEntry
from LOGS.Entity.EntityConnector import EntityConnector
from LOGS.LOGSConnection import LOGSConnection


[docs] @dataclass class DirectoryTreeNode: name: str = "" path: str = "/" type: Literal["dir", "file"] = "dir" content: Dict[str, "DirectoryTreeNode"] = field(default_factory=dict) fileCount: int = 0
[docs] def print(self, countOnly=False, indent=""): if self.type == "file": if not countOnly: print("%s%s (f)" % (indent, self.name)) return print( "%s%s (%s)%s" % ( indent, self.name, Tools.getHumanReadableSize(self.fileCount), ": " + str(len(self.content.keys())) if countOnly else "", ) ) for entry in self.content.values(): entry.print(indent=indent + "..", countOnly=countOnly)
[docs] def splitTreeByFileCount(self, maxCount: int, parentPath="", level=0): forrest: List[DirectoryTreeNode] = [] path = parentPath + "/" + self.name if path == "/": path = "" # print( # "." * level + self.name, # self.fileCount, # "<", # maxSize, # "->", # self.fileCount <= maxSize, # "=>", # path, # ) if self.fileCount <= maxCount: self.path = path return [self] else: files = [item for item in self.content.values() if item.type == "file"] if len(files) > 0: forrest.append( DirectoryTreeNode( name=self.name, path=path, content={f.name: f for f in files}, fileCount=len(files), ) ) for item in self.content.values(): if item.type == "dir": forrest.extend( item.splitTreeByFileCount( maxCount=maxCount, parentPath=path, level=level + 1 ) ) return forrest
[docs] @Endpoint("parsers") class DatasetMatching(EntityConnector): _request: MatchRequest = MatchRequest() _formatIds: Optional[List[str]] = None _matchResult: Optional[MatchResult] = None _files: List[List[FileEntry]] = [] _maxFileCountInDirectory = 20000 def __init__( self, connection: LOGSConnection, files: Union[Constants.FILE_TYPE, Sequence[Constants.FILE_TYPE]], formatIds: Optional[List[str]] = None, ignoreReadErrors=False, ): self._connection = connection self._formatIds = formatIds self._files = self.splitFileList( FileEntry.entriesFromFiles(files, ignoreReadErrors) )
[docs] @classmethod def fileListToTree(cls, files: List[FileEntry]): root = DirectoryTreeNode(name="") i = 0 for file in files: path = file.path.split("/") rootPath = path.pop(0) fileName = path.pop() # fileSize = file.size if file.size else 0 if rootPath != "": continue current = root for i, p in enumerate(path): current.fileCount += 1 if p not in current.content: current.content[p] = DirectoryTreeNode( name=p, path="/".join(f for f in path[:i]) ) current = current.content[p] current.fileCount += 1 current.content[fileName] = DirectoryTreeNode( name=fileName, path=file.path, type="file", fileCount=1 ) return root
[docs] @classmethod def TreeToFileList(cls, root: DirectoryTreeNode): if root.type == "file": file = FileEntry() file.fullPath = root.path file.id = root.path file.path = root.path return [file] files: List[FileEntry] = [] for item in root.content.values(): files.extend(cls.TreeToFileList(item)) return files
[docs] def splitFileList(self, files: List[FileEntry]): root = self.fileListToTree(files) forrest = root.splitTreeByFileCount(maxCount=self._maxFileCountInDirectory) return [self.TreeToFileList(tree) for tree in forrest]
def __iter__(self): for files in self._files: self._request = MatchRequest() self._request.formatIds = self._formatIds self._request.files = files self._match() if not self._matchResult: return None for match in self._matchResult.matches: for dataset in match: yield dataset def _match(self): connection, endpoint = self._getConnectionData() # print( # ">>> request", # len(self._request.files), # len(str(self._request.toDict()).encode("utf-8")), # "->", # len(str(self._request.toDict()).encode("utf-8")) / len(self._request.files), # "<", # 30000000, # ) data, error = connection.postEndpoint( endpoint=endpoint + ["match"], data=self._request.toDict() ) if error: raise LOGSException("Could not match dataset files: %a" % error) self._matchResult = MatchResult(data) if self._matchResult.missingFragments: lookUp = {e.id: e for e in self._request.files} for fileFragment in self._matchResult.missingFragments: if fileFragment.id not in lookUp: continue file = lookUp[fileFragment.id] file.addFragment(fileFragment.fragments) data, error = connection.postEndpoint( endpoint=endpoint + ["match"], data=self._request.toDict() ) if error: raise LOGSException("Could not match dataset files: %a" % error) self._matchResult = MatchResult(data)