OiO.lk Blog python Not receiving gps data from PT06 gps tracker device
python

Not receiving gps data from PT06 gps tracker device


I am trying to connect with PT06 gps device (https://pictortelematics.com/products/bike-gps-tracker/pt06-gps-tracker) using socket through python. I am able to make an successful GPRS connection with the gps device and it is sending me the login details (Landing Packet), from that I can extract all the values correctly. Now for the acknowledgement, I am sending an server response packet (which is mention in 5.1.2 section of https://pictortelematics.com/images/1690288584_PT06%20protocol.pdf) but I am not getting any data after sending server response (if all goes well then it should send gps data further). Also I have deployed the socket code into AWS beanstalk (which is acting as server in this setup).

Things that I have already checked:

  1. I have verified the CRC that device has sent in the login packet using CRC – ITU calibration
    method which is mention in the documentation.
  2. IMEI is also correct (that device sends and number present in the device box)
  3. Login packet and response packet format are also correct.

Code:

import socket
import struct
import binascii
from dotenv import load_dotenv
import logging
import os

# Load environment variables
load_dotenv()
HOST = os.getenv('HOSTIP', '0.0.0.0')
PORT = int(os.getenv('PORT', 6789))

# Set up logging configuration
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


crc16_table = [ 0X0000, 0X1189, 0X2312, 0X329B, 0X4624, 0X57AD, 0X6536, 0X74BF,
                0X8C48, 0X9DC1, 0XAF5A, 0XBED3, 0XCA6C, 0XDBE5, 0XE97E, 0XF8F7,
                .....
                .....
            ]



def get_crc16(pData: bytes, nLength: int, crc_table: list) -> int:
    fcs = 0xffff  # Initialize CRC to 0xffff    
    # Loop through each byte in pData up to nLength
    for i in range(nLength):
        byte = pData[i]  # Get the current byte
        fcs = (fcs >> 8) ^ crc_table[(fcs ^ byte) & 0xff]  # CRC calculation

    return ~fcs & 0xffff  # Return bitwise complement and mask to 16 bits



# Parse login packet and send response
def parse_login_packet(data: bytes, client_socket):
    logger.info("Start parse_login_packet")
    
    if data[:2] == b'\x78\x78' and data[-2:] == b'\x0D\x0A':  # Start and stop bits
        logger.info("Valid start and stop bits")
        agreement_number = data[3]
        
        if agreement_number == 0x01:  # Login packet
            logger.info("This is a login packet")
            
            # Terminal ID (8 bytes)
            terminal_id = data[4:12]
            logger.info(f"Terminal ID: {binascii.hexlify(terminal_id)}")

            # Type Identification Number (2 bytes)
            type_id = data[12:14]
            logger.info(f"Type Identification Number: {binascii.hexlify(type_id)}")

            # Time Zone and Language (2 bytes)
            timezone_language = data[14:16]
            timezone = (timezone_language[0] >> 4) & 0x0F
            language = timezone_language[1]
            logger.info(f"Time Zone: +{timezone}, Language: {language}")

            # Serial Number (2 bytes)
            serial_number = data[16:18]

            # CRC (Error Checking)
            crc_received = struct.unpack('>H', data[18:20])[0]
            crc_calculated = get_crc16(data[2:18], 16, crc16_table)

            if crc_received == crc_calculated:
                logger.info("CRC check passed")
            else:
                logger.info(f"CRC check failed: Received {crc_received}, Calculated {crc_calculated}")
                return None

            # Respond with the server's response packet
            start_bit = b'\x78\x78'
            agreement_no = b'\x01'
            stop_bit = b'\x0D\x0A'

            packet_length = struct.pack('>B', 5)  # 5 bytes
            packet = start_bit + packet_length + agreement_no + serial_number
            crc = get_crc16(packet[2:], len(packet[2:]), crc16_table) # crc will be an int number
            crc_bytes = struct.pack('>H', crc)
            server_response = packet + crc_bytes + stop_bit

            # Send response
            client_socket.send(server_response)
            logger.info(f"Sent response: {binascii.hexlify(server_response)}")
        else:
            logger.info(f"Unexpected agreement number: {agreement_number}")
    else:
        logger.info("Invalid start/stop bits")



# Parse GPS data packet (Agreement Number: 0x22)
def parse_gps_data(data: bytes):
    logger.info("Parsing GPS data...")

    # Date and Time (6 bytes)
    year, month, day, hour, minute, second = struct.unpack('>6B', data[4:10])
    date_time = f'20{year:02d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}'
    logger.info(f"Date/Time: {date_time}")

    # Latitude and Longitude (4 bytes each)
    latitude = struct.unpack('>I', data[10:14])[0] / 30000.0
    longitude = struct.unpack('>I', data[14:18])[0] / 30000.0
    logger.info(f"Latitude: {latitude}, Longitude: {longitude}")

    # Speed (1 byte)
    speed = struct.unpack('>B', data[18:19])[0]
    logger.info(f"Speed: {speed} km/h")

    # Course (2 bytes)
    course_status = struct.unpack('>H', data[19:21])[0]
    course = course_status & 0x03FF  # Last 10 bits for course
    logger.info(f"Course: {course} degrees")


# Handle various packet types
def handle_packet(data: bytes, client_socket):
    agreement_number = data[3]
    
    if agreement_number == 0x01:  # Login packet
        return parse_login_packet(data, client_socket)
    elif agreement_number == 0x22:  # GPS positioning data
        parse_gps_data(data)
    else:
        logger.info(f"Unknown packet type: {agreement_number}")



# Buffer and extract complete packets
def extract_complete_packets(buffer: bytes):
    packets = []
    logger.info(f"Buffer before extraction: {binascii.hexlify(buffer)}")
    while True:
        if len(buffer) < 2:
            logger.info("Not enough data to check start bits")
            break

        # Verify start bits
        if buffer[:2] == b'\x78\x78':  # Check start bits
            if len(buffer) < 5:
                logger.info("Not enough data to get packet length")
                break  # Not enough data to get packet length

            # Get the packet length (3rd byte is the packet length, excluding start/stop bits)
            logger.info(f"buffer[2] = {buffer[2]}")
            packet_length = buffer[2]
            total_packet_length = packet_length + 4

            logger.info(f"Packet length from buffer: {packet_length}, Total expected length: {total_packet_length}) {len(buffer)}")

            if len(buffer) >= total_packet_length:
                packet = buffer[:total_packet_length + 4]               

                # Verify stop bits
                if packet[-2:] == b'\x0D\x0A':
                    packets.append(packet)
                    buffer = buffer[total_packet_length + 2:]  # Remove the processed packet
                    logger.info(f"Packet successfully extracted. Remaining buffer: {binascii.hexlify(buffer)}")
                else:
                    logger.info(f"Invalid stop bits: {binascii.hexlify(packet[-2:])}. Discarding packet.")
                    buffer = buffer[2:]
            else:
                logger.info(f"Not enough data for the full packet, waiting for more. Current buffer size: {len(buffer)}")
                break  # Wait for more data
        else:
            logger.info(f"Invalid start bits: {binascii.hexlify(buffer[:2])}. Skipping one byte.")
            buffer = buffer[1:]

    return packets, buffer

# Initialize the socket
def start_server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((HOST, PORT))
    server_socket.listen(1)
    logger.info("Waiting for GPS device connection...")

    while True:
        client_socket, addr = server_socket.accept()
        logger.info(f"Connection established from {addr}")
        
        buffer = b''  # Buffer to hold incomplete data
        
        while True:
            data = client_socket.recv(1024)
            if not data:
                logger.info("No data received, closing connection")
                break

            buffer += data

            # Extract and process complete packets
            packets, buffer = extract_complete_packets(buffer)
            for packet in packets:
                logger.info(f"Complete packet: {binascii.hexlify(packet)}")
                handle_packet(packet, client_socket)

        client_socket.close()    

# Run the server    
if __name__ == "__main__":
    start_server()

Also, This are the sample login packet exchanged between terminal to server and server to terminal

terminal to server: 78 78 08 13 4B 04 03 00 01 00 11 06 1F 0D 0A
server to terminal: 78 78 05 13 00 11 F9 70 0D 0A

[Given in section 5.4.4 of the protocol document of this device.]



You need to sign in to view this answers

Exit mobile version