import threading import sys import os import logging #import time import socket import struct import zlib import binascii from collections import namedtuple import csv from time import gmtime, strftime import reverse_geocoder as rg from datetime import datetime, timezone import pytz SOCK_BUFF_SIZE = 100000 STA = namedtuple('STA', ['time', 'lat', 'lon', 'velocity', 'heading', 'nav', 'nsat', 'alt', 'age', 'active', 'pwr', 'csq']) def expb_expand(x): if x <= 32: return x return ((x & 0x0f) + 0x10) << ((x >> 4) - 1) def convert_STA(sta_txt): time = int(sta_txt[2:10], 16) lat = int(sta_txt[10:16], 16) if lat == 0x800000: lat = None lon = None velocity = None heading = None else: lat = lat * 1.0728837339 * 0.00001 # sirka ve [st.] lon = int(sta_txt[16:22], 16) * 2.1457672119 * 0.00001 # delka ve [st.] if lon > 180: lon = lon - 360 velocity = int(sta_txt[23:25], 16) # rychlost v [km/h] heading = int(sta_txt[25:27], 16) * 2 # smer ve stupnich 0=Sever rr = int(sta_txt[27:29], 16) nsat = rr & 0x3f # Pocet satelitu GNSS nav = rr >> 6 # Navigace 0=ne, 1=ano, 2=ano+SBAS alt = int(sta_txt[29:33], 16) # nadm. vyska v [m] if alt >= 0x8000: alt = alt - 0x10000 age = expb_expand(int(sta_txt[33:35], 16)) # stari polohy v [s] ss = int(sta_txt[38:40], 16) active = ss & 2 == 2 # jednotka detekuje jizdy pwr = ss & 1 == 1 # jednotka je v napajeni csq = int(sta_txt[43:44], 16) # kvalita signalu GSM 0-15 return STA(time, lat, lon, velocity, heading, nav, nsat, alt, age, active, pwr, csq) def generateMap(): #Gather all needed data for key in dictionary.keys(): # Get the actual OBUID if not key in helper: obuid = "N/A" if key in helper: obuid = helper[unitid]['obuid'] msgtype = dictionary[key]['msgtype'] payload = dictionary[key]['payload'] datasta = convert_STA(payload) timefromsta = datasta[0] timefromsta = datetime.fromtimestamp(timefromsta, tz) timefromsta = timefromsta.strftime("%Y-%m-%d %H:%M:%S") lat = datasta[1] lon = datasta[2] velocity = datasta[3] heading = datasta[4] nav = datasta[5] nsat = datasta[6] alt = datasta[7] age = datasta[8] active = datasta[9] pwr = datasta[10] csq = datasta[11] rtime = datasta[12] rtime = datetime.now(tz) rtime = rtime.strftime("%Y-%m-%d %H:%M:%S") # Get the state from location coordinates = (lat, lon) if datasta[1] and datasta[2] is not None: results = rg.search(coordinates, mode=1) country = results[0]['cc'] # And finally just feed the file with open('/var/www/html/maps.princip.cz/map3/data/csv_list.csv', 'w') as f: f.write("lat|lng|Name|SN|Unit time|STA received|Lat|Lon|Country|Velocity|Heading|Nav|Nsat|Age|Active|Pwr|Csq|sta\n") for key in dictionary.keys(): if lat and lon is not None: wr = f"{lat}|{lon}|{key}|{obuid}|{time}|{rtime}|{lat}|{lon}|{country}|{velocity}|{heading}|{nav}|{nsat}|{age}|{active}|{pwr}|{csq}|{sta}\n # TODO: needs rewrite to trigger this function on request time.sleep(200) # This is dictionary to be able to convert OBUID to unitID def helper(): helper = {} with open('/var/www/html/certificates.princip.cz/mapa/data/unitinfo.csv', 'r') as file: reader = csv.reader(file) for row in reader: print(row[0]) print(row[1]) helper[int(row[0])] = {'obuid': row[1]} def main(): from argparse import ArgumentParser class YArgumentParser(ArgumentParser): def format_help(self): return ArgumentParser.format_help(self) + \ getattr(self, "help_tail", "") parser = YArgumentParser(usage="", epilog="") parser.add_argument("-a", "--ip-addr", type=str, default="localhost", help="") parser.add_argument("-p", "--ip-port", type=int, default=54337, help="") parser.add_argument("-l", "--log-level", type=int, default=20, help="Logging level") parser.help_tail = """ Example: python script.py -f file ??? """ setup = parser.parse_args() logging.basicConfig(stream=sys.stderr, level=setup.log_level) logging.root.setLevel(setup.log_level) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((setup.ip_addr, setup.ip_port)) thread_generateMap = threading.Thread(name=generateMap,target=generateMap) thread_generateMap.start() while True: pack_compress, addr = sock.recvfrom(SOCK_BUFF_SIZE) pack = zlib.decompress(pack_compress) while len(pack) > 0: size, obuid, msg_type = struct.unpack(">HIb", pack[:7]) pack = pack[7:] payload = pack[:size] pack = pack[size:] print("obuid:{} >{}{}".format( obuid, chr(msg_type), binascii.hexlify(payload))) tz = pytz.timezone("Europe/Prague") rtime = datetime.now(tz) rtime = rdatetime.strftime("%Y-%m-%d %H:%M:%S") if obuid not in dictionary: print(f"Creating dictionary for {obuid}") dictionary[obuid] = {'msgtype': chr(msg_type), 'payload': binascii.hexlify(payload), 'rtime': rtime} else: print(f"we already have this unit, just update the data") dictionary[obuid]["msgtype"] = chr(msg_type) dictionary[obuid]["payload"] = binascii.hexlify(payload) dictionary[obuid]["rtime"] = rtime #obuid: 2409 > Ab #'62c04b4647352b0ab781071e0400de7c00014b7b00c3ff0000000e3031354c4330722f' if __name__ == '__main__': main()