CHARTOITA2 = { "A": int("11000", 2), "B": int("10011", 2), "C": int("01110", 2), "D": int("10010", 2), "E": int("10000", 2), "F": int("10110", 2), "G": int("01011", 2), "H": int("00101", 2), "I": int("01100", 2), "J": int("11010", 2), "K": int("11110", 2), "L": int("01001", 2), "M": int("00111", 2), "N": int("00110", 2), "O": int("00011", 2), "P": int("01101", 2), "Q": int("11101", 2), "R": int("01010", 2), "S": int("10100", 2), "T": int("00001", 2), "U": int("11100", 2), "V": int("01111", 2), "W": int("11001", 2), "X": int("10111", 2), "Y": int("10101", 2), "Z": int("10001", 2), "": int("00010", 2), "": int("01000", 2), "": int("11111", 2), "": int("11011", 2), "": int("00100", 2), "": int("00000", 2), } # message type: offset (bytes) MESSAGE_TYPE_TIMESTAMP_OFFSET = { 0x01: 8, 0x02: 4, 0x04: 8, 0x10: 4, 0x14: 4, 0x1a: 4, 0x1d: 4, 0x23: 4, 0x0b85: 4, 0x1c01: 4, 0x1c02: 4, } def bytes2uint(input_bytes): return int.from_bytes(input_bytes, byteorder="big") def bytes2uintle(input_bytes): return int.from_bytes(input_bytes, byteorder="little") def bytes2hex(input_bytes): return input_bytes.hex() def bytes2str(input_bytes): return input_bytes.decode() def parse(input_bytes, item_iter): item_dict = {} for item in item_iter: name, a_slice, function = item result = input_bytes[a_slice] if function: result = function(result) item_dict[name] = result return item_dict def mask_flag_dict(flag_bytes, flag_name_mask_iter): flag_int = int.from_bytes(flag_bytes, byteorder="big") flag_dict = {} for flag_name, mask in flag_name_mask_iter: flag_dict[flag_name] = mask & flag_int return flag_dict def get_domain(domain_str): if len(domain_str) == 4: country_code = domain_str[0:2] subdomain_code = int(domain_str[2]) status = int(domain_str[3]) + int(0x04) # 0x04 to activate domain_int = 0 domain_int |= CHARTOITA2[country_code[0]] << 11 domain_int |= CHARTOITA2[country_code[1]] << 6 domain_int |= subdomain_code << 3 domain_int |= status return domain_int.to_bytes(2, byteorder="big") else: raise ValueError(f"domain '{domain_str}' 4 characters expected") def circular_increment(value, min_value, max_value): value += 1 # value must be between 'min_value' and 'max_value' inclusive if value > max_value: value = min_value return value def get_message_type(message: bytes): try: message_type = message[0] except IndexError as ie: print(f"IndexError occurs in get_message_type: \n\n{ie}") return None if message_type in (0x0b, 0x1c): return int.from_bytes(message[:2], byteorder="big") return message_type def get_timestamp_of_message(message: bytes): message_type = get_message_type(message) if message_type in MESSAGE_TYPE_TIMESTAMP_OFFSET: index = MESSAGE_TYPE_TIMESTAMP_OFFSET[message_type] ts = int.from_bytes(message[index:index + 4], byteorder="big") return ts return None def set_timestamp_of_message(message: bytes, ts): message_type = get_message_type(message) if message_type in MESSAGE_TYPE_TIMESTAMP_OFFSET: index = MESSAGE_TYPE_TIMESTAMP_OFFSET[message_type] message = ( message[:index] + int(ts).to_bytes(4, byteorder="big") + message[index + 4:] ) return message