#!/usr/bin/python3
"""File format classes for the Commander Keen: Invasion of the Vorticons engine (DOS, 1991)
Sources:
RLE compressor
http://www.shikadi.net/moddingwiki/Keen_1-3_RLE_compression
RLEW compressor
http://www.shikadi.net/moddingwiki/RLEW_compression
LZW compressor
http://www.shikadi.net/moddingwiki/LZW_Compression
(Special thanks to Fleexy)
EGA header
http://www.shikadi.net/moddingwiki/Commander_Keen_EGA_Header
Level format
http://www.shikadi.net/moddingwiki/Commander_Keen_1-3_Level_format
"""
from __future__ import annotations
import logging
logger = logging.getLogger( __name__ )
from mrcrowbar import bits
from mrcrowbar import models as mrc
from mrcrowbar import utils
from mrcrowbar.lib.hardware import ibm_pc
from mrcrowbar.lib.images import base as img
[docs]class RLECompressor( mrc.Transform ):
[docs] def import_data( self, buffer, parent=None ):
final_length = utils.from_uint32_le( buffer[0:4] )
i = 4
out = bytearray()
while len( out ) < final_length:
byte = buffer[i]
if byte >= 128:
out.extend( buffer[i + 1 : i + byte - 126] )
i += byte - 126
else:
out.extend( buffer[i + 1 : i + 2] * (byte + 3) )
i += 2
return mrc.TransformResult( payload=bytes( out ), end_offset=i )
[docs]class RLEWCompressor( mrc.Transform ):
[docs] def import_data( self, buffer, parent=None ):
final_length = utils.from_uint32_le( buffer[0:4] )
i = 4
out = bytearray()
while len( out ) < final_length:
word = buffer[i : i + 2]
if word == b"\xfe\xfe":
count = utils.from_uint16_le( buffer[i + 2 : i + 4] )
data = buffer[i + 4 : i + 6]
out.extend( data * count )
i += 6
else:
out.extend( word )
i += 2
return mrc.TransformResult( payload=bytes( out ), end_offset=i )
[docs]class LZWCompressor( mrc.Transform ):
[docs] def import_data( self, buffer, parent=None ):
decomp_size = utils.from_uint32_le( buffer[:4] )
max_bits = utils.from_uint16_le( buffer[4:6] ) # should be 12
lookup = [bytes( (i,) ) for i in range( 256 )]
lookup.append( None ) # 256: error
lookup.append( None ) # 257: end of data
output = bytearray()
bs = bits.BitStream( buffer, 6, bit_endian="big", io_endian="big" )
state = {"usebits": 9}
def add_to_lookup( state, entry ):
if len( lookup ) < (1 << max_bits):
logger.debug( f"lookup[{len( lookup )}] = {entry}" )
lookup.append( entry )
if len( lookup ) == (1 << state["usebits"]) - 1:
state["usebits"] = min( state["usebits"] + 1, max_bits )
logger.debug( "usebits = {}".format( state["usebits"] ) )
return
fcode = bs.read( state["usebits"] )
match = lookup[fcode]
logger.debug( f"fcode={fcode},match={match}" )
output.extend( match )
while True:
ncode = bs.read( state["usebits"] )
logger.debug( f"ncode={ncode}" )
if ncode == 257:
# end of data
break
elif ncode == 256:
# error
raise Exception( "Found error code, data is not valid" )
elif ncode < len( lookup ):
nmatch = lookup[ncode]
else:
nmatch = match + match[0:1]
logger.debug( f"match={match}" )
logger.debug( f"nmatch={nmatch}" )
output.extend( nmatch )
# add code to lookup
add_to_lookup( state, match + nmatch[0:1] )
match = nmatch
if len( output ) != decomp_size:
logger.warning(
"{}: was expecting data of size {}, got data of size {} instead".format(
self, decomp_size, len( output )
)
)
return mrc.TransformResult( payload=bytes( output ), end_offset=len( buffer ) )
[docs]class EGATile8( mrc.Block ):
image_data = mrc.Bytes( 0x00, length=mrc.Ref( "_parent.tile8_size" ) )
def __init__( self, *args, **kwargs ):
super().__init__( *args, **kwargs )
self.tiles = img.IndexedImage(
self,
width=8,
height=8,
source=mrc.Ref( "image_data" ),
frame_count=mrc.Ref( "_parent._parent._egahead.tile8_count" ),
palette=ibm_pc.EGA_DEFAULT_PALETTE,
)
[docs]class EGATile16( mrc.Block ):
image_data = mrc.Bytes( 0x00, length=mrc.Ref( "_parent.tile16_size" ) )
def __init__( self, *args, **kwargs ):
super().__init__( *args, **kwargs )
self.tiles = img.IndexedImage(
self,
width=16,
height=16,
source=mrc.Ref( "image_data" ),
frame_count=mrc.Ref( "_parent._parent._egahead.tile16_count" ),
palette=ibm_pc.EGA_DEFAULT_PALETTE,
)
[docs]class EGATile32( mrc.Block ):
image_data = mrc.Bytes( 0x00, length=mrc.Ref( "_parent.tile32_size" ) )
def __init__( self, *args, **kwargs ):
super().__init__( *args, **kwargs )
self.tiles = img.IndexedImage(
self,
width=32,
height=32,
source=mrc.Ref( "image_data" ),
frame_count=mrc.Ref( "_parent._parent._egahead.tile32_count" ),
palette=ibm_pc.EGA_DEFAULT_PALETTE,
)
[docs]class EGATileStore( mrc.Block ):
data = mrc.Bytes(
0x00,
transform=img.Planarizer(
bpp=4, plane_size=mrc.Ref( "_parent._egahead.latch_plane_size" )
),
)
tile8 = mrc.StoreRef(
EGATile8, mrc.Ref( "store" ), mrc.Ref( "tile8_offset" ), mrc.Ref( "tile8_size" )
)
tile16 = mrc.StoreRef(
EGATile16,
mrc.Ref( "store" ),
mrc.Ref( "tile16_offset" ),
mrc.Ref( "tile16_size" ),
)
tile32 = mrc.StoreRef(
EGATile32,
mrc.Ref( "store" ),
mrc.Ref( "tile32_offset" ),
mrc.Ref( "tile32_size" ),
)
def __init__( self, *args, **kwargs ):
self.store = mrc.Store( self, mrc.Ref( "data" ) )
super().__init__( *args, **kwargs )
@property
def tile8_offset( self ):
return self._parent._egahead.tile8_offset * 8
@property
def tile16_offset( self ):
return self._parent._egahead.tile16_offset * 8
@property
def tile32_offset( self ):
return self._parent._egahead.tile32_offset * 8
@property
def tile8_size( self ):
return self._parent._egahead.tile8_count * 8 * 8
@property
def tile16_size( self ):
return self._parent._egahead.tile16_count * 16 * 16
@property
def tile32_size( self ):
return self._parent._egahead.tile16_count * 32 * 32
[docs]class EGALatch( mrc.Block ):
_egahead = None
tilestore = mrc.BlockField( EGATileStore, 0x00 )
[docs]class EGALatchComp( mrc.Block ):
_egahead = None
tilestore = mrc.BlockField( EGATileStore, 0x00, transform=LZWCompressor() )
[docs]class SoundRef( mrc.Block ):
offset = mrc.UInt16_LE( 0x00 )
priority = mrc.UInt8( 0x02 )
rate = mrc.UInt8( 0x03 )
name = mrc.Bytes( 0x04, length=0x0c )
[docs]class PreviewCompressor( mrc.Transform ):
rle = RLECompressor()
# each plane is stored with 192 bytes padding at the end
plan = img.Planarizer( bpp=4, width=320, height=200, plane_padding=192 )
[docs] def import_data( self, buffer, parent=None ):
assert utils.is_bytes( buffer )
stage_1 = self.rle.import_data( buffer )
stage_2 = self.plan.import_data( stage_1.payload )
return mrc.TransformResult(
payload=stage_2.payload, end_offset=stage_1.end_offset
)
[docs]class Preview( mrc.Block ):
image_data = mrc.Bytes( 0x0000, transform=PreviewCompressor() )
def __init__( self, *args, **kwargs ):
mrc.Block.__init__( self, *args, **kwargs )
self.image = img.IndexedImage(
self,
width=320,
height=200,
palette=ibm_pc.EGA_DEFAULT_PALETTE,
source=mrc.Ref( "image_data" ),
)
[docs]class Level( mrc.Unknown ):
pass
[docs]class LevelTile( mrc.Block ):
tile_id = mrc.UInt16_LE( 0x00 )
[docs]class ScoresItems( mrc.Block ):
joystick = mrc.UInt16_LE( 0x00, range=range( 0, 1 ) )
battery = mrc.UInt16_LE( 0x02, range=range( 0, 1 ) )
vacuum = mrc.UInt16_LE( 0x04, range=range( 0, 1 ) )
liquor = mrc.UInt16_LE( 0x06, range=range( 0, 1 ) )
[docs]class ScoresName( mrc.Block ):
name = mrc.Bytes( 0x00, length=13 )
[docs]class Scores( mrc.Block ):
values = mrc.UInt32_LE( 0x00, length=7 )
items = mrc.BlockField( ScoresItems, 0x1c, count=7 )
num_cities = mrc.UInt16_LE( 0x54, length=7, range=range( 0, 9 ) )
unknown_1 = mrc.Bytes( 0x62, length=14 )
names = mrc.BlockField( ScoresName, 0x70, count=7 )
term = mrc.Const( mrc.Bytes( 0xcb, length=1 ), b"\x00" )
[docs]class Loader( mrc.Loader ):
_SEP = mrc.Loader._SEP
_KEEN_FILE_CLASS_MAP = {
_SEP + "(EGAHEAD).CK([1-3])$": EGAHeader,
_SEP + "(EGALATCH).CK(1)$": EGALatchComp,
_SEP + "(EGALATCH).CK([2-3])$": EGALatch,
_SEP + "(FINALE).CK([1-3])$": Preview,
_SEP + "(LEVEL)([0-9]{2}).CK([1-3])$": Level,
_SEP + "(PREVIEW)([2-3]).CK([1-3])$": Preview,
_SEP + "(SCORES).CK([1-3])$": Scores,
}
_KEEN_DEPS = [
(
_SEP + "(EGALATCH).CK([1-3])$",
_SEP + "(EGAHEAD).CK([1-3])$",
("EGAHEAD", "{1}"),
"_egahead",
)
]
def __init__( self ):
super().__init__( self._KEEN_FILE_CLASS_MAP, self._KEEN_DEPS )
[docs] def post_load( self ):
pass