class NetworkMessageClassDefinition:
TRACE = False
def __init__(self, className: str, raw: ByteArray) -> None:
classSpec = ProtocolSpec.getClassSpecByName(className)
self.parent = classSpec.parent
self.fields = classSpec.fields
self.boolfields = classSpec.boolfields
self.cls = classSpec.cls
self.raw = raw
def deserialize(self, childInstance: object = None) -> object:
if childInstance is None:
inst = self.cls()
else:
inst = childInstance
if self.TRACE:
TraceLogger().debug("------------------ Deserializing {} STARTED-----------------".format(self.cls.__name__))
if self.parent is not None:
if self.TRACE:
TraceLogger().debug(f"Class has parent {self.parent}")
inst = NetworkMessageClassDefinition(self.parent, self.raw).deserialize(inst)
if self.TRACE:
TraceLogger().debug("End of parent deserialization")
TraceLogger().debug(f"BytesArray positon: {self.raw.position}")
try:
for field, value in self.readBooleans(self.boolfields, self.raw).items():
if self.TRACE:
TraceLogger().debug(f"{field} = {value}")
setattr(inst, field, value)
except Exception as e:
TraceLogger().debug(f"Remaining bytes in raw: {self.raw.remaining()}")
TraceLogger().error(f"Error while reading boolean fields!")
raise e
for field in self.fields:
attrib = field.name
if self.TRACE:
TraceLogger().debug(f"Deserializing field '{attrib}', remaining bytes '{self.raw.remaining()}'.")
if field.optional:
isProvided = self.raw.readByte()
if not isProvided:
if self.TRACE:
TraceLogger().debug(f"Field '{attrib}' is optional and was not provided.")
continue
try:
value = nmdf.NetMsgDataField(field, self.raw).deserialize()
except Exception as e:
TraceLogger().debug(inst.__class__.__name__)
TraceLogger().debug(self.fields)
TraceLogger().error(str(e), exc_info=True)
raise KeyboardInterrupt
setattr(inst, attrib, value)
if self.TRACE:
TraceLogger().debug("------------------ Deserializing {} ENDED---------------------".format(self.cls.__name__))
if inst.__class__.__base__ == bnm.NetworkMessage:
bnm.NetworkMessage.__init__(inst)
return inst
@classmethod
def readBooleans(cls, boolfields: list[FieldSpec], raw: ByteArray):
ans = {}
n = len(boolfields)
if n > 0:
if cls.TRACE:
TraceLogger().debug("Reading {} booleans".format(n))
TraceLogger().debug(f"I need {n // 8} bytes")
TraceLogger().debug(f"Remaining bytes in raw: {raw.remaining()}")
if raw.remaining() < n // 8:
raise Exception("Not enough bytes to read booleans")
for i, var in enumerate(boolfields):
if i % 8 == 0:
_box: int = raw.readByte()
value = boolByteWrapper.getFlag(_box, i % 8)
if cls.TRACE:
TraceLogger().debug(f"{var.name} = {value}")
ans[var.name] = value
return ans
class NetMsgDataField:
TRACE = False
dataReader = {
TypeEnum.INT: "readInt",
TypeEnum.UNSIGNEDINT: "readUnsignedInt",
TypeEnum.SHORT: "readShort",
TypeEnum.UNSIGNEDSHORT: "readUnsignedShort",
TypeEnum.BYTE: "readByte",
TypeEnum.UNSIGNEDBYTE: "readUnsignedByte",
TypeEnum.FLOAT: "readFloat",
TypeEnum.DOUBLE: "readDouble",
TypeEnum.BOOLEAN: "readBoolean",
TypeEnum.VARINT: "readVarInt",
TypeEnum.VARLONG: "readVarLong",
TypeEnum.UTF: "readUTF",
TypeEnum.VARUHSHORT: "readVarUhShort",
TypeEnum.VARUHINT: "readVarUhInt",
TypeEnum.VARSHORT: "readVarShort",
TypeEnum.VARUHLONG: "readVarUhLong",
}
def __init__(self, spec: FieldSpec, raw: ByteArray):
self._spec = spec
self._raw = raw
self._type = None
self._length = None
@property
def name(self) -> str:
return self._spec.name
@property
def type(self) -> str:
if not self._type:
self._type = self._spec. type
return self._type
@type.setter
def type(self, newValue):
self._type = newValue
@property
def length(self) -> int:
if self._length is None:
self._length = self._spec.length
return self._length
@length.setter
def length(self, val):
if val < 0:
raise ValueError(f"Vector length can't be assigned a negative value '{val}'")
self._length = val
@property
def lengthTypeId(self) -> int:
return self._spec.lengthTypeId
@property
def typename(self) -> str:
return self._spec.typename
def deserialize(self):
if self._spec.isVector():
return self.readVector()
if self._spec.isPrimitive():
val = self.readPrimitive()
if self.TRACE:
TraceLogger().debug(f"Field {self.name} = {val}")
return val
else:
return self.readObject()
def readPrimitive(self, typeId=None):
if typeId is None:
typeId = self._spec.typeId
dataReader = NetMsgDataField.dataReader.get(typeId)
if dataReader is None:
raise Exception(f"TypeId '{typeId}' not found in known types ids")
return getattr(self._raw, dataReader)()
def readObject(self):
if self._spec.dynamicType:
typeId = self._raw.readUnsignedShort()
self.type = ProtocolSpec.getTypeSpecById(typeId).name
if self.type is None:
raise Exception(f"Unable to parse dynamic type name of typeid '{typeId}'.")
obj = nmcd.NetworkMessageClassDefinition(self.type, self._raw).deserialize()
return obj
def readVector(self):
if self.length is None:
self.length = self.readPrimitive(TypeEnum(self.lengthTypeId))
if self.TRACE:
TraceLogger().debug(f"Read Vector length = {self.length}")
if self.TRACE:
TraceLogger().debug(f"==> Deserialising Vector<{self.typename}> of length {self.length}, remaining bytes {self._raw.remaining()}")
ret = []
for i in range(self.length):
if self._spec.isPrimitive():
val = self.readPrimitive()
if self.TRACE:
TraceLogger().debug(f"Vector value {i} = {val}")
ret.append(val)
else:
if self.TRACE:
TraceLogger().debug(f"Reading Vector Object {i}.")
ret.append(self.readObject())
return ret