OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

How can I ensure that real-time telemetry is published to an MQTT broker every second during a DroneKit flight, rather than all at once at the end?

  • Thread starter Thread starter David Vázquez Masero
  • Start date Start date
D

David Vázquez Masero

Guest
I am creating a sort of UTM simulator using python. On one hand, I have a flask server which is supposed to send a flight plan to a drone (built in dronekit and dronekit-sitl), which executes the flight plan. The workflow is the following:

  1. The flask server sends a flight plan to dronekit and dronekit-sitl via MQTT broker in a topic called "flight/plan"
  2. Dronekit-sitl receives the home location from the flight plan and sets up a vehicle. Then, it publishes the connection string to the topic "flight/drone".
  3. The flight manager in dronekit receives the connection string and waypoints from the topics and creates a vehicle which executes the flight according to the flight plan.

Up to this point, it is all good. The problem comes afterwards:

  1. As a final task, the drone should elaborate a telemetry message and publish it every second to the mqtt topic "flight/telemetry".

The loop indicates the message is being generated and supposedly published, but the reality is that instead of publishing the info every second, the message is not published until the flight has ended, time in which the messages are published in bulk.

I have tried threading the processes, using different QoS in mqtt and everything, but I don't seem to find the problem. The logs for mosquitto don't show any issue, they just show the messages being published and received by other clients in bulk instead of every second.

The thing runs in local, although the simulator part is running on a virtual machine in Linux for compatibility reasons.

The setup for the telemetry part is as follows. I omitted everything non related with the problem.

mqtt_manager.py

Code:
class MQTTManager:
    def __init__(self, broker_host, broker_port):
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect
        self.client.connect(broker_host, broker_port, 60)
        self.client.loop_start()
        self.connection_string = None
        self.flight_plan = None

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print(f"Conectado al broker MQTT con código: {rc}")
            client.subscribe("flight/plans")
            client.subscribe("flight/drone",qos=2)
        else:
            print(f"Error al conectar: {rc}")

    def on_message(self, client, userdata, msg):
        print(f"Mensaje recibido: {msg.topic} {msg.payload}")
        if msg.topic == "flight/plans":
            self.flight_plan = json.loads(msg.payload.decode())
            if self.connection_string:
                self.start_flight()
        elif msg.topic == "flight/drone":
            self.connection_string = msg.payload.decode()
            if self.flight_plan:
                self.start_flight()
                
    def start_flight(self):
        process_flight_plan(self.flight_plan, self.connection_string, self)

    def on_disconnect(self, client, userdata, rc):
        print(f"Desconectado con código: {rc}")
        if rc != 0:
            print("Intentando reconectar...")
            try:
                client.reconnect()
            except Exception as e:
                print(f"Error al reconectar: {e}")

    def publish_telemetry(self, topic, telemetry_data):
        try:
            self.client.publish(topic, json.dumps(telemetry_data), qos=2)
            print(f"Telemetría enviada: {telemetry_data}")
        except Exception as e:
            print(f"Error al publicar telemetría: {e}")

mqtt_manager = MQTTManager('localhost', 1883)

flight_manager.py

Code:
def send_telemetry_periodically(mqtt_manager):
    while True:
        send_telemetry(mqtt_manager)
        time.sleep(1) 

def process_flight_plan(flight_plan, connection_string, mqtt_manager):
    vehicle_manager = VehicleManager.get_instance()
    
    waypoints = flight_plan['waypoints']
    if not waypoints[0]:
        print("No hay waypoints en el plan de vuelo recibido.")
        return
    
    try:
        vehicle_manager.connect_vehicle(connection_string)
    
        print("Vehículo armado") 
    
        print("Despegando...")
        vehicle_manager.get_vehicle().simple_takeoff(10)
        
        # Hilo para enviar telemetría
        telemetry_thread = threading.Thread(target=send_telemetry_periodically, args=(mqtt_manager,))
        telemetry_thread.daemon = True
        telemetry_thread.start()

        for point in flight_plan['waypoints']:
            location = LocationGlobalRelative(point['lat'], point['lon'], point['alt'])
            print(f"Navegando a: {location}")
            move_thread = threading.Thread(target=vehicle_manager.move_vehicle_to_location, args=(location,))
            move_thread.start()
            move_thread.join()

        print("Aterrizando...")
        
    except Exception as e:
        print(f"Error en process_flight_plan:{str(e)}")

telemetry_manager.py

Code:
from modules.vehicle_manager import VehicleManager

def send_telemetry(mqtt_manager):
    vehicle_manager = VehicleManager.get_instance()
    vehicle = vehicle_manager.get_vehicle()
    if vehicle and vehicle.mode.name == 'GUIDED':
        telemetry = vehicle_manager.capture_vehicle_state()
        print("Mensaje de telemetría: ", telemetry)
        mqtt_manager.publish_telemetry("flight/telemetry",telemetry)

vehicle.py

Code:
class VehicleManager:
    _instance = None
    vehicle = None
    connection_string = None
    
    @staticmethod
    def get_instance():
        if VehicleManager._instance is None:
            VehicleManager()
        return VehicleManager._instance
    
    def __init__(self):
        if VehicleManager._instance is not None:
            raise Exception("This class is a singleton!")
        else:
            VehicleManager._instance = self
        
    def get_vehicle(self):
        return self.vehicle
    
    def capture_vehicle_state(self):
        if self.vehicle:
            return {
                'altitude': self.vehicle.location.global_relative_frame.alt,
                'latitude': self.vehicle.location.global_relative_frame.lat,
                'longitude': self.vehicle.location.global_relative_frame.lon,
                'armed': self.vehicle.armed,
                'mode': self.vehicle.mode.name
            }
        else: 
            return None
<p>I am creating a sort of UTM simulator using python. On one hand, I have a flask server which is supposed to send a flight plan to a drone (built in dronekit and dronekit-sitl), which executes the flight plan.
The workflow is the following:</p>
<ol>
<li>The flask server sends a flight plan to dronekit and dronekit-sitl via MQTT broker in a topic called "flight/plan"</li>
<li>Dronekit-sitl receives the home location from the flight plan and sets up a vehicle. Then, it publishes the connection string to the topic "flight/drone".</li>
<li>The flight manager in dronekit receives the connection string and waypoints from the topics and creates a vehicle which executes the flight according to the flight plan.</li>
</ol>
<p>Up to this point, it is all good. The problem comes afterwards:</p>
<ol start="4">
<li>As a final task, the drone should elaborate a telemetry message and publish it every second to the mqtt topic "flight/telemetry".</li>
</ol>
<p>The loop indicates the message is being generated and supposedly published, but the reality is that instead of publishing the info every second, the message is not published until the flight has ended, time in which the messages are published in bulk.</p>
<p>I have tried threading the processes, using different QoS in mqtt and everything, but I don't seem to find the problem. The logs for mosquitto don't show any issue, they just show the messages being published and received by other clients in bulk instead of every second.</p>
<p>The thing runs in local, although the simulator part is running on a virtual machine in Linux for compatibility reasons.</p>
<p>The setup for the telemetry part is as follows. I omitted everything non related with the problem.</p>
<p>mqtt_manager.py</p>
<pre><code>class MQTTManager:
def __init__(self, broker_host, broker_port):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
self.client.connect(broker_host, broker_port, 60)
self.client.loop_start()
self.connection_string = None
self.flight_plan = None

def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"Conectado al broker MQTT con código: {rc}")
client.subscribe("flight/plans")
client.subscribe("flight/drone",qos=2)
else:
print(f"Error al conectar: {rc}")

def on_message(self, client, userdata, msg):
print(f"Mensaje recibido: {msg.topic} {msg.payload}")
if msg.topic == "flight/plans":
self.flight_plan = json.loads(msg.payload.decode())
if self.connection_string:
self.start_flight()
elif msg.topic == "flight/drone":
self.connection_string = msg.payload.decode()
if self.flight_plan:
self.start_flight()

def start_flight(self):
process_flight_plan(self.flight_plan, self.connection_string, self)

def on_disconnect(self, client, userdata, rc):
print(f"Desconectado con código: {rc}")
if rc != 0:
print("Intentando reconectar...")
try:
client.reconnect()
except Exception as e:
print(f"Error al reconectar: {e}")

def publish_telemetry(self, topic, telemetry_data):
try:
self.client.publish(topic, json.dumps(telemetry_data), qos=2)
print(f"Telemetría enviada: {telemetry_data}")
except Exception as e:
print(f"Error al publicar telemetría: {e}")

mqtt_manager = MQTTManager('localhost', 1883)
</code></pre>
<p>flight_manager.py</p>
<pre><code>def send_telemetry_periodically(mqtt_manager):
while True:
send_telemetry(mqtt_manager)
time.sleep(1)

def process_flight_plan(flight_plan, connection_string, mqtt_manager):
vehicle_manager = VehicleManager.get_instance()

waypoints = flight_plan['waypoints']
if not waypoints[0]:
print("No hay waypoints en el plan de vuelo recibido.")
return

try:
vehicle_manager.connect_vehicle(connection_string)

print("Vehículo armado")

print("Despegando...")
vehicle_manager.get_vehicle().simple_takeoff(10)

# Hilo para enviar telemetría
telemetry_thread = threading.Thread(target=send_telemetry_periodically, args=(mqtt_manager,))
telemetry_thread.daemon = True
telemetry_thread.start()

for point in flight_plan['waypoints']:
location = LocationGlobalRelative(point['lat'], point['lon'], point['alt'])
print(f"Navegando a: {location}")
move_thread = threading.Thread(target=vehicle_manager.move_vehicle_to_location, args=(location,))
move_thread.start()
move_thread.join()

print("Aterrizando...")

except Exception as e:
print(f"Error en process_flight_plan:{str(e)}")

</code></pre>
<p>telemetry_manager.py</p>
<pre><code>from modules.vehicle_manager import VehicleManager

def send_telemetry(mqtt_manager):
vehicle_manager = VehicleManager.get_instance()
vehicle = vehicle_manager.get_vehicle()
if vehicle and vehicle.mode.name == 'GUIDED':
telemetry = vehicle_manager.capture_vehicle_state()
print("Mensaje de telemetría: ", telemetry)
mqtt_manager.publish_telemetry("flight/telemetry",telemetry)
</code></pre>
<p>vehicle.py</p>
<pre><code>class VehicleManager:
_instance = None
vehicle = None
connection_string = None

@staticmethod
def get_instance():
if VehicleManager._instance is None:
VehicleManager()
return VehicleManager._instance

def __init__(self):
if VehicleManager._instance is not None:
raise Exception("This class is a singleton!")
else:
VehicleManager._instance = self

def get_vehicle(self):
return self.vehicle

def capture_vehicle_state(self):
if self.vehicle:
return {
'altitude': self.vehicle.location.global_relative_frame.alt,
'latitude': self.vehicle.location.global_relative_frame.lat,
'longitude': self.vehicle.location.global_relative_frame.lon,
'armed': self.vehicle.armed,
'mode': self.vehicle.mode.name
}
else:
return None

</code></pre>
 

Latest posts

Top