init
This commit is contained in:
56
scripts/create_mulog.py
Normal file
56
scripts/create_mulog.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
from scripts import utils
|
||||
from directive.munet.log_record import identify, parse, full_parse
|
||||
|
||||
# from .settings import conf
|
||||
|
||||
LOGGER = logging.LoggerAdapter(
|
||||
logging.getLogger("swmu"), {"unitid": "unassigned"})
|
||||
|
||||
|
||||
def load_log(import_dir, sim_time_check):
|
||||
with open(import_dir) as fin:
|
||||
# take only message even when row is f"{log_address}: {message}"
|
||||
fin_data = [row.split(":")[-1].strip() for row in fin]
|
||||
index = 0
|
||||
list_ts = []
|
||||
list_ts_warning = []
|
||||
five_year = time.time() - 157784630
|
||||
|
||||
for message in fin_data:
|
||||
message_ts = utils.get_timestamp_of_message(
|
||||
bytes.fromhex(message))
|
||||
if sim_time_check == "max_y" and message_ts is not None:
|
||||
if message_ts < five_year:
|
||||
list_ts_warning.append(
|
||||
(index, message, message_ts))
|
||||
continue
|
||||
message_type = identify(bytes.fromhex(message))
|
||||
try:
|
||||
message_type_parse = parse(bytes.fromhex(message))
|
||||
message_type_full_parse = full_parse(bytes.fromhex(message))
|
||||
except Exception:
|
||||
# except Exception as e:
|
||||
# logging.warning(e)
|
||||
continue
|
||||
if message_ts is not None and message_ts <= 423792000:
|
||||
list_ts_warning.append(
|
||||
(index, message, message_ts))
|
||||
list_ts.append(
|
||||
(index, message, message_ts, message_type, message_type_parse,
|
||||
message_type_full_parse, list_ts_warning))
|
||||
index += 1
|
||||
"""
|
||||
if list_ts_warning:
|
||||
LOGGER.warning(
|
||||
f"timestamp in mulog is suspicious ([row in mulog]"
|
||||
f", [message], [timestamp]): {list_ts_warning}")
|
||||
"""
|
||||
return list_ts
|
||||
|
||||
|
||||
def dir_mulog(import_dir, sim_time_check):
|
||||
mulog_data = load_log(import_dir, sim_time_check)
|
||||
return mulog_data
|
||||
139
scripts/utils.py
Normal file
139
scripts/utils.py
Normal file
@@ -0,0 +1,139 @@
|
||||
CHARTOITA2 = {
|
||||
"A": int("11000", 2),
|
||||
"B": int("10011", 2),
|
||||
"C": int("01110", 2),
|
||||
"D": int("10010", 2),
|
||||
"E": int("10000", 2),
|
||||
"F": int("10110", 2),
|
||||
"G": int("01011", 2),
|
||||
"H": int("00101", 2),
|
||||
"I": int("01100", 2),
|
||||
"J": int("11010", 2),
|
||||
"K": int("11110", 2),
|
||||
"L": int("01001", 2),
|
||||
"M": int("00111", 2),
|
||||
"N": int("00110", 2),
|
||||
"O": int("00011", 2),
|
||||
"P": int("01101", 2),
|
||||
"Q": int("11101", 2),
|
||||
"R": int("01010", 2),
|
||||
"S": int("10100", 2),
|
||||
"T": int("00001", 2),
|
||||
"U": int("11100", 2),
|
||||
"V": int("01111", 2),
|
||||
"W": int("11001", 2),
|
||||
"X": int("10111", 2),
|
||||
"Y": int("10101", 2),
|
||||
"Z": int("10001", 2),
|
||||
"<CR>": int("00010", 2),
|
||||
"<LF>": int("01000", 2),
|
||||
"<LS>": int("11111", 2),
|
||||
"<FS>": int("11011", 2),
|
||||
"<SP>": int("00100", 2),
|
||||
"<NULL>": int("00000", 2),
|
||||
}
|
||||
|
||||
# message type: offset (bytes)
|
||||
MESSAGE_TYPE_TIMESTAMP_OFFSET = {
|
||||
0x01: 8,
|
||||
0x02: 4,
|
||||
0x04: 8,
|
||||
0x10: 4,
|
||||
0x14: 4,
|
||||
0x1a: 4,
|
||||
0x1d: 4,
|
||||
0x23: 4,
|
||||
0x0b85: 4,
|
||||
0x1c01: 4,
|
||||
0x1c02: 4,
|
||||
}
|
||||
|
||||
|
||||
def bytes2uint(input_bytes):
|
||||
return int.from_bytes(input_bytes, byteorder="big")
|
||||
|
||||
|
||||
def bytes2uintle(input_bytes):
|
||||
return int.from_bytes(input_bytes, byteorder="little")
|
||||
|
||||
|
||||
def bytes2hex(input_bytes):
|
||||
return input_bytes.hex()
|
||||
|
||||
|
||||
def bytes2str(input_bytes):
|
||||
return input_bytes.decode()
|
||||
|
||||
|
||||
def parse(input_bytes, item_iter):
|
||||
item_dict = {}
|
||||
for item in item_iter:
|
||||
name, a_slice, function = item
|
||||
result = input_bytes[a_slice]
|
||||
if function:
|
||||
result = function(result)
|
||||
item_dict[name] = result
|
||||
return item_dict
|
||||
|
||||
|
||||
def mask_flag_dict(flag_bytes, flag_name_mask_iter):
|
||||
flag_int = int.from_bytes(flag_bytes, byteorder="big")
|
||||
flag_dict = {}
|
||||
for flag_name, mask in flag_name_mask_iter:
|
||||
flag_dict[flag_name] = mask & flag_int
|
||||
return flag_dict
|
||||
|
||||
|
||||
def get_domain(domain_str):
|
||||
if len(domain_str) == 4:
|
||||
country_code = domain_str[0:2]
|
||||
subdomain_code = int(domain_str[2])
|
||||
status = int(domain_str[3]) + int(0x04) # 0x04 to activate
|
||||
domain_int = 0
|
||||
domain_int |= CHARTOITA2[country_code[0]] << 11
|
||||
domain_int |= CHARTOITA2[country_code[1]] << 6
|
||||
domain_int |= subdomain_code << 3
|
||||
domain_int |= status
|
||||
return domain_int.to_bytes(2, byteorder="big")
|
||||
else:
|
||||
raise ValueError(f"domain '{domain_str}' 4 characters expected")
|
||||
|
||||
|
||||
def circular_increment(value, min_value, max_value):
|
||||
value += 1
|
||||
# value must be between 'min_value' and 'max_value' inclusive
|
||||
if value > max_value:
|
||||
value = min_value
|
||||
return value
|
||||
|
||||
|
||||
def get_message_type(message: bytes):
|
||||
try:
|
||||
message_type = message[0]
|
||||
except IndexError as ie:
|
||||
print(f"IndexError occurs in get_message_type: \n\n{ie}")
|
||||
return None
|
||||
if message_type in (0x0b, 0x1c):
|
||||
return int.from_bytes(message[:2], byteorder="big")
|
||||
return message_type
|
||||
|
||||
|
||||
def get_timestamp_of_message(message: bytes):
|
||||
message_type = get_message_type(message)
|
||||
if message_type in MESSAGE_TYPE_TIMESTAMP_OFFSET:
|
||||
index = MESSAGE_TYPE_TIMESTAMP_OFFSET[message_type]
|
||||
ts = int.from_bytes(message[index:index + 4], byteorder="big")
|
||||
return ts
|
||||
return None
|
||||
|
||||
|
||||
def set_timestamp_of_message(message: bytes, ts):
|
||||
message_type = get_message_type(message)
|
||||
if message_type in MESSAGE_TYPE_TIMESTAMP_OFFSET:
|
||||
index = MESSAGE_TYPE_TIMESTAMP_OFFSET[message_type]
|
||||
message = (
|
||||
message[:index]
|
||||
+ int(ts).to_bytes(4, byteorder="big")
|
||||
+ message[index + 4:]
|
||||
)
|
||||
return message
|
||||
Reference in New Issue
Block a user