from __future__ import annotations
import logging
import os
import re
from collections import Counter, OrderedDict, defaultdict
from mmap import mmap
logger = logging.getLogger( __name__ )
[docs]class Archive:
def __init__( self ):
pass
[docs] def close( self ):
pass
[docs] def list_files( self, path=None, recurse=True ):
pass
[docs] def list_paths( self, path=None, recurse=True ):
pass
[docs] def get_file( self, path ):
pass
[docs]class FileSystem( Archive ):
def __init__( self, base_path ):
self.base_path = os.path.abspath( base_path )
def _to_internal( self, path ):
assert path.startswith( self.base_path )
return "." + path[len( self.base_path ) :] + os.path.sep
def _from_internal( self, path ):
assert path.startswith( "." + os.path.sep )
return self.base_path + path[1:]
[docs] def list_paths( self, path=None, recurse=True ):
results = []
base = self.base_path if path is None else self._from_internal( path )
if not recurse:
_, sub_folders, _ = next( os.walk( base ) )
results = [self._to_internal( root ) for root in sub_folders]
else:
for root, sub_folders, files in os.walk( base ):
results.append( self._to_internal( root ) )
return results
[docs] def list_files( self, path=None, recurse=True ):
results = []
base = self.base_path if path is None else self._from_internal( path )
if not recurse:
_, _, files = next( os.walk( base ) )
results = [self._to_internal( f ) for f in files]
else:
for root, sub_folders, files in os.walk( base ):
for f in files:
results.append( self._to_internal( root ) + f )
return results
[docs] def get_file( self, path ):
# TODO: something nicer involving mmap?
return open( self._from_internal( path ), "r+b" )
[docs]class Loader:
_SEP = re.escape( os.path.sep )
def __init__(
self,
file_class_map,
dependency_list=None,
case_sensitive=False,
unique_matches=True,
):
self.file_class_map = file_class_map
self.dependency_list = dependency_list
self.case_sensitive = case_sensitive
self.unique_matches = unique_matches
self.re_flags = re.IGNORECASE if not case_sensitive else 0
self.file_re_map = {
key: re.compile( key, flags=self.re_flags )
for key, klass in file_class_map.items()
if klass
}
self._files = OrderedDict()
[docs] def load( self, target_path ):
# target_path = os.path.abspath( target_path )
self.fs = FileSystem( target_path )
for f in self.fs.list_files():
for key, regex in self.file_re_map.items():
match = regex.search( f )
if match:
self._files[f] = {
"klass": self.file_class_map[key],
"re": key,
"match": match.groups(),
}
if not self.case_sensitive:
self._files[f]["match"] = tuple(
[x.upper() for x in self._files[f]["match"]]
)
if self.unique_matches:
unique_check = {
k: v
for k, v in Counter(
[x["match"] for x in self._files.values()]
).items()
if v > 1
}
if unique_check:
extras = []
for name, file in self._files.items():
if file["match"] in unique_check:
extras.append( name )
self._files = {}
raise Exception(
f"Multiple filename matches found for the same source: {', '.join( extras )}"
)
dependencies = []
if self.dependency_list:
for i, (consumer, dependency, format, attr) in enumerate(
self.dependency_list
):
consumer_re = re.compile( consumer, flags=self.re_flags )
dependency_re = re.compile( dependency, flags=self.re_flags )
consumer_matches = []
dependency_matches = []
if not self.case_sensitive:
format = tuple( [x.upper() for x in format] )
for path in self._files:
consumer_match = consumer_re.search( path )
dependency_match = dependency_re.search( path )
if consumer_match and dependency_match:
self._files = {}
raise Exception(
f"Problem parsing dependencies: path {path} matches for both consumer ({consumer}) and dependency ({dependency})"
)
elif consumer_match:
groups = consumer_match.groups()
if not self.case_sensitive:
groups = tuple( [x.upper() for x in groups] )
consumer_matches.append( (path, groups) )
elif dependency_match:
groups = dependency_match.groups()
if not self.case_sensitive:
groups = tuple( [x.upper() for x in groups] )
dependency_matches.append( (path, groups) )
for path, groups in consumer_matches:
target_groups = tuple( [x.format( *groups ) for x in format] )
if not self.case_sensitive:
target_groups = tuple( [x.upper() for x in target_groups] )
targets = [
x[0] for x in dependency_matches if x[1] == target_groups
]
if len( targets ) > 1:
self._files = {}
raise Exception(
f"Problem parsing dependencies: path {path} has multiple matches for dependency {attr} ({', '.join( targets )})"
)
elif len( targets ) == 1:
dependencies.append( (i, path, targets[0]) )
# make dependency lookup table
dependency_map = defaultdict( list )
for index, source, dest in dependencies:
dependency_map[source].append( (dest, self.dependency_list[index][3]) )
# model the dependency tree
head_count = defaultdict( int )
tails = defaultdict( list )
heads = []
for index, tail, head in dependencies:
head_count[tail] += 1
if head in tails:
tails[head].append( tail )
else:
tails[head] = [tail]
heads.append( head )
load_order = [h for h in heads if h not in head_count]
for head in load_order:
for tail in tails[head]:
head_count[tail] -= 1
if not head_count[tail]:
load_order.append( tail )
loop = [n for n, heads in head_count.items() if heads]
if loop:
self._files = {}
raise Exception( "Problem parsing dependencies: loop detected" )
load_order += [x for x in self._files.keys() if x not in load_order]
# load files in based on dependency sorted list order
logger.info( f"{self}: loading files" )
for path in load_order:
info = self._files[path]
with self.fs.get_file( path ) as f:
data = mmap( f.fileno(), 0 )
logger.info( f'{path} => {info["klass"]}' )
deps = {
attr: self._files[dest]["obj"]
for dest, attr in dependency_map[path]
}
info["obj"] = info["klass"]( data, preload_attrs=deps )
data.close()
self.post_load()
return
[docs] def post_load( self ):
pass
[docs] def save_file( self, target ):
assert target in self._files
export = self._files[target]["obj"].export_data()
with open( target, "wb" ) as out:
out.write( export )
return
[docs] def keys( self ):
return self._files.keys()
def __len__( self ):
return len( self._files )
def __getitem__( self, key ):
return self._files[key]["obj"]
def __contains__( self, key ):
return key in self._files