import base64
import os
import uuid
from datetime import datetime
from hashlib import sha256
from typing import Any, List, Literal, Optional, Sequence, Union, cast
from LOGS.Auxiliary.Constants import Constants
from LOGS.Entity.SerializeableContent import SerializeableClass
[docs]
class FingerprintFragment(SerializeableClass):
offset: int = 0
length: int = 0
bytes: str = ""
[docs]
class FileFragment(SerializeableClass):
_typeMapper = {"fragments": FingerprintFragment}
id: str = ""
fragments: List[FingerprintFragment] = []
FormatFileState = Literal["NEW", "UNCHANGED", "NEEDSUPDATE", "DELETE"]
[docs]
class FileEntry(SerializeableClass):
_typeMapper = {"fragments": FingerprintFragment}
_noSerialize = ["isDir", "name"]
id: Optional[str] = None
fullPath: str = ""
path: str = ""
isDir: bool = False
name: str = ""
fragments: Optional[List[FingerprintFragment]] = None
hash: Optional[str] = None
state: Optional[FormatFileState] = None
mtime: Optional[datetime] = None
def __init__(
self,
ref: Any = None,
fullPath: Optional[str] = None,
state: Optional[FormatFileState] = None,
):
_path: str = ""
if isinstance(ref, (str, os.DirEntry, FileEntry)):
if isinstance(ref, FileEntry):
_path = ref.path
ref = ref.fullPath
elif isinstance(ref, os.DirEntry) and ref.path:
_path = ref.path
_fullPath = os.path.realpath(ref)
ref = {
"id": uuid.uuid4().hex,
"fullPath": _fullPath,
"isDir": os.path.isdir(_fullPath),
"name": os.path.basename(_fullPath),
}
super().__init__(ref)
if fullPath is not None:
self.fullPath = fullPath
self.path = _path
if state is not None:
self.state = state
def __str__(self):
return "<%s %s%a>" % (
type(self).__name__,
("<dir> " if self.isDir else ""),
self.fullPath,
)
[docs]
def addFragment(self, fragments: List[FingerprintFragment]):
with open(self.fullPath, "rb") as read:
if self.fragments is None:
self.fragments = []
for fragment in fragments:
read.seek(fragment.offset)
fragment.bytes = base64.b64encode(read.read(fragment.length)).decode(
"utf-8"
)
self.fragments.append(fragment)
[docs]
def addHash(self):
with open(self.fullPath, "rb") as read:
self.hash = sha256(read.read()).hexdigest()
[docs]
def addMtime(self):
self.mtime = datetime.fromtimestamp(os.path.getmtime(self.fullPath))
[docs]
@classmethod
def entriesFromFiles(
cls,
files: Union[Constants.FILE_TYPE, Sequence[Constants.FILE_TYPE]],
ignoreReadErrors=False,
):
if files == None:
raise FileNotFoundError("Could not read file or directory from 'None' path")
if not isinstance(files, list):
files = [cast(Constants.FILE_TYPE, files)]
result: List[FileEntry] = []
while len(files) > 0:
file = files.pop(0)
if isinstance(file, (str, os.DirEntry, FileEntry)):
f = FileEntry(file)
if f.isDir:
with os.scandir(f.fullPath) as entries:
files.extend(entries)
else:
if not os.path.isfile(f.fullPath) or not os.access(
f.fullPath, os.R_OK
):
if not ignoreReadErrors:
raise PermissionError("Could not read file %a" % f.fullPath)
else:
f.id = f.fullPath
result.append(f)
return result