I am trying to connect with PT06 gps device (https://pictortelematics.com/products/bike-gps-tracker/pt06-gps-tracker
) using socket through python. I am able to make an successful GPRS connection with the gps device and it is sending me the login details (Landing Packet), from that I can extract all the values correctly. Now for the acknowledgement, I am sending an server response packet (which is mention in 5.1.2 section of https://pictortelematics.com/images/1690288584_PT06%20protocol.pdf
) but I am not getting any data after sending server response (if all goes well then it should send gps data further). Also I have deployed the socket code into AWS beanstalk (which is acting as server in this setup).
Things that I have already checked:
- I have verified the CRC that device has sent in the login packet using CRC – ITU calibration
method which is mention in the documentation. - IMEI is also correct (that device sends and number present in the device box)
- Login packet and response packet format are also correct.
Code:
import socket
import struct
import binascii
from dotenv import load_dotenv
import logging
import os
# Load environment variables
load_dotenv()
HOST = os.getenv('HOSTIP', '0.0.0.0')
PORT = int(os.getenv('PORT', 6789))
# Set up logging configuration
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
crc16_table = [ 0X0000, 0X1189, 0X2312, 0X329B, 0X4624, 0X57AD, 0X6536, 0X74BF,
0X8C48, 0X9DC1, 0XAF5A, 0XBED3, 0XCA6C, 0XDBE5, 0XE97E, 0XF8F7,
.....
.....
]
def get_crc16(pData: bytes, nLength: int, crc_table: list) -> int:
fcs = 0xffff # Initialize CRC to 0xffff
# Loop through each byte in pData up to nLength
for i in range(nLength):
byte = pData[i] # Get the current byte
fcs = (fcs >> 8) ^ crc_table[(fcs ^ byte) & 0xff] # CRC calculation
return ~fcs & 0xffff # Return bitwise complement and mask to 16 bits
# Parse login packet and send response
def parse_login_packet(data: bytes, client_socket):
logger.info("Start parse_login_packet")
if data[:2] == b'\x78\x78' and data[-2:] == b'\x0D\x0A': # Start and stop bits
logger.info("Valid start and stop bits")
agreement_number = data[3]
if agreement_number == 0x01: # Login packet
logger.info("This is a login packet")
# Terminal ID (8 bytes)
terminal_id = data[4:12]
logger.info(f"Terminal ID: {binascii.hexlify(terminal_id)}")
# Type Identification Number (2 bytes)
type_id = data[12:14]
logger.info(f"Type Identification Number: {binascii.hexlify(type_id)}")
# Time Zone and Language (2 bytes)
timezone_language = data[14:16]
timezone = (timezone_language[0] >> 4) & 0x0F
language = timezone_language[1]
logger.info(f"Time Zone: +{timezone}, Language: {language}")
# Serial Number (2 bytes)
serial_number = data[16:18]
# CRC (Error Checking)
crc_received = struct.unpack('>H', data[18:20])[0]
crc_calculated = get_crc16(data[2:18], 16, crc16_table)
if crc_received == crc_calculated:
logger.info("CRC check passed")
else:
logger.info(f"CRC check failed: Received {crc_received}, Calculated {crc_calculated}")
return None
# Respond with the server's response packet
start_bit = b'\x78\x78'
agreement_no = b'\x01'
stop_bit = b'\x0D\x0A'
packet_length = struct.pack('>B', 5) # 5 bytes
packet = start_bit + packet_length + agreement_no + serial_number
crc = get_crc16(packet[2:], len(packet[2:]), crc16_table) # crc will be an int number
crc_bytes = struct.pack('>H', crc)
server_response = packet + crc_bytes + stop_bit
# Send response
client_socket.send(server_response)
logger.info(f"Sent response: {binascii.hexlify(server_response)}")
else:
logger.info(f"Unexpected agreement number: {agreement_number}")
else:
logger.info("Invalid start/stop bits")
# Parse GPS data packet (Agreement Number: 0x22)
def parse_gps_data(data: bytes):
logger.info("Parsing GPS data...")
# Date and Time (6 bytes)
year, month, day, hour, minute, second = struct.unpack('>6B', data[4:10])
date_time = f'20{year:02d}-{month:02d}-{day:02d} {hour:02d}:{minute:02d}:{second:02d}'
logger.info(f"Date/Time: {date_time}")
# Latitude and Longitude (4 bytes each)
latitude = struct.unpack('>I', data[10:14])[0] / 30000.0
longitude = struct.unpack('>I', data[14:18])[0] / 30000.0
logger.info(f"Latitude: {latitude}, Longitude: {longitude}")
# Speed (1 byte)
speed = struct.unpack('>B', data[18:19])[0]
logger.info(f"Speed: {speed} km/h")
# Course (2 bytes)
course_status = struct.unpack('>H', data[19:21])[0]
course = course_status & 0x03FF # Last 10 bits for course
logger.info(f"Course: {course} degrees")
# Handle various packet types
def handle_packet(data: bytes, client_socket):
agreement_number = data[3]
if agreement_number == 0x01: # Login packet
return parse_login_packet(data, client_socket)
elif agreement_number == 0x22: # GPS positioning data
parse_gps_data(data)
else:
logger.info(f"Unknown packet type: {agreement_number}")
# Buffer and extract complete packets
def extract_complete_packets(buffer: bytes):
packets = []
logger.info(f"Buffer before extraction: {binascii.hexlify(buffer)}")
while True:
if len(buffer) < 2:
logger.info("Not enough data to check start bits")
break
# Verify start bits
if buffer[:2] == b'\x78\x78': # Check start bits
if len(buffer) < 5:
logger.info("Not enough data to get packet length")
break # Not enough data to get packet length
# Get the packet length (3rd byte is the packet length, excluding start/stop bits)
logger.info(f"buffer[2] = {buffer[2]}")
packet_length = buffer[2]
total_packet_length = packet_length + 4
logger.info(f"Packet length from buffer: {packet_length}, Total expected length: {total_packet_length}) {len(buffer)}")
if len(buffer) >= total_packet_length:
packet = buffer[:total_packet_length + 4]
# Verify stop bits
if packet[-2:] == b'\x0D\x0A':
packets.append(packet)
buffer = buffer[total_packet_length + 2:] # Remove the processed packet
logger.info(f"Packet successfully extracted. Remaining buffer: {binascii.hexlify(buffer)}")
else:
logger.info(f"Invalid stop bits: {binascii.hexlify(packet[-2:])}. Discarding packet.")
buffer = buffer[2:]
else:
logger.info(f"Not enough data for the full packet, waiting for more. Current buffer size: {len(buffer)}")
break # Wait for more data
else:
logger.info(f"Invalid start bits: {binascii.hexlify(buffer[:2])}. Skipping one byte.")
buffer = buffer[1:]
return packets, buffer
# Initialize the socket
def start_server():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST, PORT))
server_socket.listen(1)
logger.info("Waiting for GPS device connection...")
while True:
client_socket, addr = server_socket.accept()
logger.info(f"Connection established from {addr}")
buffer = b'' # Buffer to hold incomplete data
while True:
data = client_socket.recv(1024)
if not data:
logger.info("No data received, closing connection")
break
buffer += data
# Extract and process complete packets
packets, buffer = extract_complete_packets(buffer)
for packet in packets:
logger.info(f"Complete packet: {binascii.hexlify(packet)}")
handle_packet(packet, client_socket)
client_socket.close()
# Run the server
if __name__ == "__main__":
start_server()
Also, This are the sample login packet exchanged between terminal to server and server to terminal
terminal to server: 78 78 08 13 4B 04 03 00 01 00 11 06 1F 0D 0A
server to terminal: 78 78 05 13 00 11 F9 70 0D 0A
[Given in section 5.4.4 of the protocol document of this device.]
You need to sign in to view this answers
Leave feedback about this