OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Extracting tempature Metadata from flir hadron camera using python3 and gstreamer

  • Thread starter Thread starter Ihssane Oubari
  • Start date Start date
I

Ihssane Oubari

Guest
i'm working Flir hadron camera 640R that provides thermal and RGB data .with jetsion nano B01 .

i'm trying to extract metadata from the thermal frames to be able to display hot spot/cold spot : indicating the highest/lowest tempature detected for each frame ,and i want it to be at the displaying saving and streaming . i'm also streaming saving and displaying rgb data .(i'm using gstreamer in python3) i have been trying to impliment this tempature extraction for so long but nothing seems to work i would be gratefull for any advice ,Thank you

when i execute code 1 this is the error that i get, the rgb attempt to start displaying but it shuts down emidiatly and the ir doesn't even display :

Error received from element v4l2src1:Internal data stream error. Debugging information: gstbasesrc.c(3055): gst_base_src_loop(): /GstPipeline1/GstV4l2src:v4l2src1: Streaming stopped , reason not-negotiated(-4)

NOTE : i have a working code that starts the streaming saving and displaying of rgb/IR data controlled by GPIO

Code1(the not working code with the tempature extraction):

Code:
import RPi.GPIO as GPIO
import time
import datetime
import numpy as np
import gi  # type: ignore
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib  # type: ignore

# Initialize GStreamer
Gst.init(None)

# Pin Definitions
input_pin = 24  # Pin 24 as input

# Variables to manage the video capture
is_recording = False
rgb_pipeline = None
ir_pipeline = None
loop = GLib.MainLoop()

def convert_y16_to_8bit(frame):
    """Converts Y16 16-bit thermal image frame to 8-bit."""
    min_val = np.min(frame)
    max_val = np.max(frame)
    frame_8bit = ((frame - min_val) / (max_val - min_val) * 255).astype(np.uint8)
    return frame_8bit, min_val, max_val

def raw_to_celsius(raw_val):
    """Converts raw thermal camera value to degrees Celsius."""
    kelvin = raw_val * 0.01  # Example conversion factor, replace with correct one
    celsius = kelvin - 273.15
    return celsius

def create_pipeline(device, width, height, stream_url, file_prefix, timestamp, thermal=False):
    if thermal:
       pipeline_description = (
            f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
            f"video/x-raw,format=GRAY16_LE,width={width},height={height},framerate=30/1 ! "
            "videoconvert ! "
            "tee name=i "
            "i. ! "
            "queue ! appsink name=sink emit-signals=True sync=false "
            "i. ! "
            "videoconvert ! "
            "video/x-raw,format=GRAY8 ! "
            "textoverlay name=overlay valignment=top halignment=left text='Temp: ' ! "
            "clockoverlay halignment=left valignment=bottom time-format=\"%Y-%m-%d %H:%M:%S\" font-desc='Sans, 36' ! "
            "timeoverlay halignment=right valignment=bottom text=\"Stream time:\" font-desc='Sans, 24' ! "
            "tee name=t ! "
            "queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
            "tee name=l ! "
            f"queue ! flvmux ! rtmpsink location='{stream_url} live=1' "
            "l. ! "
            f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
            "t. ! "
            "queue ! xvimagesink sync=false "
    )
        

    else:
        pipeline_description = (
        f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
        f"video/x-raw, width={width}, height={height}, framerate=30/1 ! clockoverlay halignment=left valignment=bottom time-format= \"%Y-%m-%d %H:%M:%S\" font-desc='Sans, 36'! timeoverlay halignment=right valignment=bottom text= \"Stream time:\" font-desc='Sans, 24' ! "
        "tee name=t ! "
        "queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
        "tee name=l ! "
        f"queue ! flvmux ! rtmpsink location='{stream_url} live=1' "
        "l. ! "
        f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
        "t. ! "
        "queue leaky=1 ! xvimagesink sync=false"
    )
    return Gst.parse_launch(pipeline_description)

def on_message_rgb(bus, message, loop):
    msg_type = message.type
    if msg_type == Gst.MessageType.ERROR:
        err, debug_info = message.parse_error()
        print(f"Error received from element {message.src.get_name()}: {err.message}")
        print(f"Debugging information: {debug_info if debug_info else 'none'}")
        rgb_pipeline.set_state(Gst.State.NULL)
        loop.quit()
    elif msg_type == Gst.MessageType.EOS:
        print("End-Of-Stream reached")
        rgb_pipeline.set_state(Gst.State.NULL)

def on_message_ir(bus, message, loop):
    msg_type = message.type
    if msg_type == Gst.MessageType.ERROR:
        err, debug_info = message.parse_error()
        print(f"Error received from element {message.src.get_name()}: {err.message}")
        print(f"Debugging information: {debug_info if debug_info else 'none'}")
        ir_pipeline.set_state(Gst.State.NULL)
        loop.quit()
    elif msg_type == Gst.MessageType.EOS:
        print("End-Of-Stream reached")
        ir_pipeline.set_state(Gst.State.NULL)

def new_sample(sink, data):
    sample = sink.emit('pull-sample')
    buf = sample.get_buffer()
    caps = sample.get_caps()
    array = np.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width')),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=np.uint16)
    
    # Convert the frame and get min/max temperatures
    frame_8bit, min_raw, max_raw = convert_y16_to_8bit(array)
    min_temp = raw_to_celsius(min_raw)
    max_temp = raw_to_celsius(max_raw)

    # Find min and max locations
    min_loc = np.unravel_index(np.argmin(array), array.shape)
    max_loc = np.unravel_index(np.argmax(array), array.shape)
    
    # Update the textoverlay element with the min/max temperature and locations
    overlay = ir_pipeline.get_by_name('overlay')
    if overlay:
        overlay.set_property("text", f"Cold: {min_temp:.2f}C ({min_loc[1]},{min_loc[0]})  Hot: {max_temp:.2f}C ({max_loc[1]},{max_loc[0]})")
    
    return Gst.FlowReturn.OK

def start_rgb_recording():
    global rgb_pipeline, loop
    current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    rgb_pipeline = create_pipeline('/dev/video0', 1920, 1080, 'rtmp://google.de/live/rgb', 'RGB', current_timestamp)
    bus_rgb = rgb_pipeline.get_bus()
    bus_rgb.add_signal_watch()
    bus_rgb.connect("message", on_message_rgb, loop)
    rgb_pipeline.set_state(Gst.State.PLAYING)
    print("Starting RGB pipeline...")

def start_ir_recording():
    global ir_pipeline, loop
    current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    ir_pipeline = create_pipeline('/dev/video1', 640, 512, 'rtmp://google.de/live_lowres/ir', 'IR', current_timestamp, thermal=True)
    bus_ir = ir_pipeline.get_bus()
    bus_ir.add_signal_watch()
    bus_ir.connect("message", on_message_ir, loop)

    sink = ir_pipeline.get_by_name('sink')
    sink.connect('new-sample', new_sample, None)

    ir_pipeline.set_state(Gst.State.PLAYING)
    print("Starting IR pipeline...")



def stop_rgb_recording():
    global rgb_pipeline
    if rgb_pipeline:
        rgb_pipeline.send_event(Gst.Event.new_eos())
        time.sleep(1)
        rgb_pipeline.set_state(Gst.State.NULL)
        print("RGB stopped")

def stop_ir_recording():
    global ir_pipeline
    if ir_pipeline:
        ir_pipeline.send_event(Gst.Event.new_eos())
        time.sleep(1)
        ir_pipeline.set_state(Gst.State.NULL)
        print("IR stopped")



def pin_callback(channel):
    global is_recording

    if GPIO.input(channel):
        print("Rising edge detected on pin", channel)
        if not is_recording:
            start_rgb_recording()
            start_ir_recording()
            is_recording = True
    else:
        print("Falling edge detected on pin", channel)
        if is_recording:
            stop_rgb_recording()
            stop_ir_recording()
            is_recording = False

def main():
    GPIO.setmode(GPIO.BCM)  # Use BCM pin numbering
    GPIO.setup(input_pin, GPIO.IN)  # Set pin 24 as input

    GPIO.add_event_detect(input_pin, GPIO.BOTH, callback=pin_callback, bouncetime=300)

    print("Starting demo now! Press CTRL+C to exit")
    try:
        #GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, handle_keyboard_interrupt, loop)
        loop.run()
    except KeyboardInterrupt:
        print("stopping pipelines...")
        loop.quit()
        print("Pipelines stopped.")   
if __name__ == '__main__':
    main()

here is my working code without tempature extraction:

Code:
import RPi.GPIO as GPIO
import time
import datetime

import gi  # type: ignore
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib  # type: ignore

# Initialize GStreamer
Gst.init(None)

# Pin Definitions
input_pin = 24  # Pin 24 as input

# Variables to manage the video capture
is_recording = False
rgb_pipeline = None
ir_pipeline = None
loop = GLib.MainLoop()

def create_pipeline(device, width, height, stream_url, file_prefix, timestamp):
    pipeline_description = (
        f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
        f"video/x-raw, width={width}, height={height}, framerate=30/1 ! clockoverlay halignment=left valignment=bottom time-format= \"%Y-%m-%d %H:%M:%S\" font-desc='Sans, 36'! timeoverlay halignment=right valignment=bottom text= \"Stream time:\" font-desc='Sans, 24' ! "
        "tee name=t ! "
        "queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
        "tee name=l ! "
        f"queue ! flvmux ! rtmpsink location='{stream_url} live=1' "
        "l. ! "
        f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
        "t. ! "
        "queue leaky=1 ! xvimagesink sync=false"
    )
    return Gst.parse_launch(pipeline_description)

def on_message_rgb(bus, message, loop):
    msg_type = message.type
    if msg_type == Gst.MessageType.ERROR:
        err, debug_info = message.parse_error()
        print(f"Error received from element {message.src.get_name()}: {err.message}")
        print(f"Debugging information: {debug_info if debug_info else 'none'}")
        rgb_pipeline.set_state(Gst.State.NULL)
        loop.quit()
    elif msg_type == Gst.MessageType.EOS:
        print("End-Of-Stream reached")
        rgb_pipeline.set_state(Gst.State.NULL)

def on_message_ir(bus, message, loop):
    msg_type = message.type
    if msg_type == Gst.MessageType.ERROR:
        err, debug_info = message.parse_error()
        print(f"Error received from element {message.src.get_name()}: {err.message}")
        print(f"Debugging information: {debug_info if debug_info else 'none'}")
        ir_pipeline.set_state(Gst.State.NULL)
        loop.quit()
    elif msg_type == Gst.MessageType.EOS:
        print("End-Of-Stream reached")
        ir_pipeline.set_state(Gst.State.NULL)


def start_recording():
    global is_recording, rgb_pipeline, ir_pipeline, loop

    current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    rgb_pipeline = create_pipeline('/dev/video0', 1920, 1080, 'rtmp://google/live/rgb', 'RGB', current_timestamp)
    ir_pipeline = create_pipeline('/dev/video1', 640, 512, 'rtmp://google.waldwaechter.de/live_lowres/ir', 'IR', current_timestamp)
    loop = GLib.MainLoop()
    bus_rgb = rgb_pipeline.get_bus()
    bus_ir = ir_pipeline.get_bus()
    bus_rgb.add_signal_watch()
    bus_ir.add_signal_watch()
    bus_rgb.connect("message", on_message_rgb, loop)
    bus_ir.connect("message", on_message_ir, loop)

    rgb_pipeline.set_state(Gst.State.PLAYING)
    ir_pipeline.set_state(Gst.State.PLAYING)
    print("Starting RGB and IR pipelines...")


def stop_recording():
    global is_recording, rgb_pipeline, ir_pipeline, loop

    if rgb_pipeline:
        rgb_pipeline.send_event(Gst.Event.new_eos())
        time.sleep(1)
        rgb_pipeline.set_state(Gst.State.NULL)
        print("RGB stopped")

    if ir_pipeline:
        ir_pipeline.send_event(Gst.Event.new_eos())
        time.sleep(1)
        ir_pipeline.set_state(Gst.State.NULL)
        print("IR stopped")



def pin_callback(channel):
    global is_recording

    if GPIO.input(channel):
        print("Rising edge detected on pin", channel)
        if not is_recording:
            start_recording()
            is_recording = True
    else:
        print("Falling edge detected on pin", channel)
        if is_recording:
            stop_recording()
            is_recording = False

def main():
    GPIO.setmode(GPIO.BCM)  # Use BCM pin numbering
    GPIO.setup(input_pin, GPIO.IN)  # Set pin 24 as input

    GPIO.add_event_detect(input_pin, GPIO.BOTH, callback=pin_callback, bouncetime=300)

    print("Starting demo now! Press CTRL+C to exit")
    try:
        #GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, handle_keyboard_interrupt, loop)
        loop.run()
    except KeyboardInterrupt:
        print("stopping pipelines...")
        loop.quit()
        print("Pipelines stopped.")   
if __name__ == '__main__':
    main()
<p>i'm working Flir hadron camera 640R that provides thermal and RGB data .with jetsion nano B01 .</p>
<p>i'm trying to extract metadata from the thermal frames to be able to display hot spot/cold spot : indicating the highest/lowest tempature detected for each frame ,and i want it to be at the displaying saving and streaming . i'm also streaming saving and displaying rgb data .(i'm using gstreamer in python3)
i have been trying to impliment this tempature extraction for so long but nothing seems to work i would be gratefull for any advice ,Thank you</p>
<p>when i execute code 1 this is the error that i get, the rgb attempt to start displaying but it shuts down emidiatly and the ir doesn't even display :</p>
<p>Error received from element v4l2src1:Internal data stream error.
Debugging information: gstbasesrc.c(3055): gst_base_src_loop(): /GstPipeline1/GstV4l2src:v4l2src1:
Streaming stopped , reason not-negotiated(-4)</p>
<p>NOTE : i have a working code that starts the streaming saving and displaying of rgb/IR data controlled by GPIO</p>
<p>Code1(the not working code with the tempature extraction):</p>
<pre><code>import RPi.GPIO as GPIO
import time
import datetime
import numpy as np
import gi # type: ignore
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib # type: ignore

# Initialize GStreamer
Gst.init(None)

# Pin Definitions
input_pin = 24 # Pin 24 as input

# Variables to manage the video capture
is_recording = False
rgb_pipeline = None
ir_pipeline = None
loop = GLib.MainLoop()

def convert_y16_to_8bit(frame):
"""Converts Y16 16-bit thermal image frame to 8-bit."""
min_val = np.min(frame)
max_val = np.max(frame)
frame_8bit = ((frame - min_val) / (max_val - min_val) * 255).astype(np.uint8)
return frame_8bit, min_val, max_val

def raw_to_celsius(raw_val):
"""Converts raw thermal camera value to degrees Celsius."""
kelvin = raw_val * 0.01 # Example conversion factor, replace with correct one
celsius = kelvin - 273.15
return celsius

def create_pipeline(device, width, height, stream_url, file_prefix, timestamp, thermal=False):
if thermal:
pipeline_description = (
f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
f"video/x-raw,format=GRAY16_LE,width={width},height={height},framerate=30/1 ! "
"videoconvert ! "
"tee name=i "
"i. ! "
"queue ! appsink name=sink emit-signals=True sync=false "
"i. ! "
"videoconvert ! "
"video/x-raw,format=GRAY8 ! "
"textoverlay name=overlay valignment=top halignment=left text='Temp: ' ! "
"clockoverlay halignment=left valignment=bottom time-format=\"%Y-%m-%d %H:%M:%S\" font-desc='Sans, 36' ! "
"timeoverlay halignment=right valignment=bottom text=\"Stream time:\" font-desc='Sans, 24' ! "
"tee name=t ! "
"queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
"tee name=l ! "
f"queue ! flvmux ! rtmpsink location='{stream_url} live=1' "
"l. ! "
f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
"t. ! "
"queue ! xvimagesink sync=false "
)


else:
pipeline_description = (
f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
f"video/x-raw, width={width}, height={height}, framerate=30/1 ! clockoverlay halignment=left valignment=bottom time-format= \"%Y-%m-%d %H:%M:%S\" font-desc='Sans, 36'! timeoverlay halignment=right valignment=bottom text= \"Stream time:\" font-desc='Sans, 24' ! "
"tee name=t ! "
"queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
"tee name=l ! "
f"queue ! flvmux ! rtmpsink location='{stream_url} live=1' "
"l. ! "
f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
"t. ! "
"queue leaky=1 ! xvimagesink sync=false"
)
return Gst.parse_launch(pipeline_description)

def on_message_rgb(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}")
print(f"Debugging information: {debug_info if debug_info else 'none'}")
rgb_pipeline.set_state(Gst.State.NULL)
loop.quit()
elif msg_type == Gst.MessageType.EOS:
print("End-Of-Stream reached")
rgb_pipeline.set_state(Gst.State.NULL)

def on_message_ir(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}")
print(f"Debugging information: {debug_info if debug_info else 'none'}")
ir_pipeline.set_state(Gst.State.NULL)
loop.quit()
elif msg_type == Gst.MessageType.EOS:
print("End-Of-Stream reached")
ir_pipeline.set_state(Gst.State.NULL)

def new_sample(sink, data):
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
caps = sample.get_caps()
array = np.ndarray(
(caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width')),
buffer=buf.extract_dup(0, buf.get_size()),
dtype=np.uint16)

# Convert the frame and get min/max temperatures
frame_8bit, min_raw, max_raw = convert_y16_to_8bit(array)
min_temp = raw_to_celsius(min_raw)
max_temp = raw_to_celsius(max_raw)

# Find min and max locations
min_loc = np.unravel_index(np.argmin(array), array.shape)
max_loc = np.unravel_index(np.argmax(array), array.shape)

# Update the textoverlay element with the min/max temperature and locations
overlay = ir_pipeline.get_by_name('overlay')
if overlay:
overlay.set_property("text", f"Cold: {min_temp:.2f}C ({min_loc[1]},{min_loc[0]}) Hot: {max_temp:.2f}C ({max_loc[1]},{max_loc[0]})")

return Gst.FlowReturn.OK

def start_rgb_recording():
global rgb_pipeline, loop
current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
rgb_pipeline = create_pipeline('/dev/video0', 1920, 1080, 'rtmp://google.de/live/rgb', 'RGB', current_timestamp)
bus_rgb = rgb_pipeline.get_bus()
bus_rgb.add_signal_watch()
bus_rgb.connect("message", on_message_rgb, loop)
rgb_pipeline.set_state(Gst.State.PLAYING)
print("Starting RGB pipeline...")

def start_ir_recording():
global ir_pipeline, loop
current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
ir_pipeline = create_pipeline('/dev/video1', 640, 512, 'rtmp://google.de/live_lowres/ir', 'IR', current_timestamp, thermal=True)
bus_ir = ir_pipeline.get_bus()
bus_ir.add_signal_watch()
bus_ir.connect("message", on_message_ir, loop)

sink = ir_pipeline.get_by_name('sink')
sink.connect('new-sample', new_sample, None)

ir_pipeline.set_state(Gst.State.PLAYING)
print("Starting IR pipeline...")



def stop_rgb_recording():
global rgb_pipeline
if rgb_pipeline:
rgb_pipeline.send_event(Gst.Event.new_eos())
time.sleep(1)
rgb_pipeline.set_state(Gst.State.NULL)
print("RGB stopped")

def stop_ir_recording():
global ir_pipeline
if ir_pipeline:
ir_pipeline.send_event(Gst.Event.new_eos())
time.sleep(1)
ir_pipeline.set_state(Gst.State.NULL)
print("IR stopped")



def pin_callback(channel):
global is_recording

if GPIO.input(channel):
print("Rising edge detected on pin", channel)
if not is_recording:
start_rgb_recording()
start_ir_recording()
is_recording = True
else:
print("Falling edge detected on pin", channel)
if is_recording:
stop_rgb_recording()
stop_ir_recording()
is_recording = False

def main():
GPIO.setmode(GPIO.BCM) # Use BCM pin numbering
GPIO.setup(input_pin, GPIO.IN) # Set pin 24 as input

GPIO.add_event_detect(input_pin, GPIO.BOTH, callback=pin_callback, bouncetime=300)

print("Starting demo now! Press CTRL+C to exit")
try:
#GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, handle_keyboard_interrupt, loop)
loop.run()
except KeyboardInterrupt:
print("stopping pipelines...")
loop.quit()
print("Pipelines stopped.")
if __name__ == '__main__':
main()
</code></pre>
<p>here is my working code without tempature extraction:</p>
<pre><code>import RPi.GPIO as GPIO
import time
import datetime

import gi # type: ignore
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib # type: ignore

# Initialize GStreamer
Gst.init(None)

# Pin Definitions
input_pin = 24 # Pin 24 as input

# Variables to manage the video capture
is_recording = False
rgb_pipeline = None
ir_pipeline = None
loop = GLib.MainLoop()

def create_pipeline(device, width, height, stream_url, file_prefix, timestamp):
pipeline_description = (
f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
f"video/x-raw, width={width}, height={height}, framerate=30/1 ! clockoverlay halignment=left valignment=bottom time-format= \"%Y-%m-%d %H:%M:%S\" font-desc='Sans, 36'! timeoverlay halignment=right valignment=bottom text= \"Stream time:\" font-desc='Sans, 24' ! "
"tee name=t ! "
"queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
"tee name=l ! "
f"queue ! flvmux ! rtmpsink location='{stream_url} live=1' "
"l. ! "
f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
"t. ! "
"queue leaky=1 ! xvimagesink sync=false"
)
return Gst.parse_launch(pipeline_description)

def on_message_rgb(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}")
print(f"Debugging information: {debug_info if debug_info else 'none'}")
rgb_pipeline.set_state(Gst.State.NULL)
loop.quit()
elif msg_type == Gst.MessageType.EOS:
print("End-Of-Stream reached")
rgb_pipeline.set_state(Gst.State.NULL)

def on_message_ir(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}")
print(f"Debugging information: {debug_info if debug_info else 'none'}")
ir_pipeline.set_state(Gst.State.NULL)
loop.quit()
elif msg_type == Gst.MessageType.EOS:
print("End-Of-Stream reached")
ir_pipeline.set_state(Gst.State.NULL)


def start_recording():
global is_recording, rgb_pipeline, ir_pipeline, loop

current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

rgb_pipeline = create_pipeline('/dev/video0', 1920, 1080, 'rtmp://google/live/rgb', 'RGB', current_timestamp)
ir_pipeline = create_pipeline('/dev/video1', 640, 512, 'rtmp://google.waldwaechter.de/live_lowres/ir', 'IR', current_timestamp)
loop = GLib.MainLoop()
bus_rgb = rgb_pipeline.get_bus()
bus_ir = ir_pipeline.get_bus()
bus_rgb.add_signal_watch()
bus_ir.add_signal_watch()
bus_rgb.connect("message", on_message_rgb, loop)
bus_ir.connect("message", on_message_ir, loop)

rgb_pipeline.set_state(Gst.State.PLAYING)
ir_pipeline.set_state(Gst.State.PLAYING)
print("Starting RGB and IR pipelines...")


def stop_recording():
global is_recording, rgb_pipeline, ir_pipeline, loop

if rgb_pipeline:
rgb_pipeline.send_event(Gst.Event.new_eos())
time.sleep(1)
rgb_pipeline.set_state(Gst.State.NULL)
print("RGB stopped")

if ir_pipeline:
ir_pipeline.send_event(Gst.Event.new_eos())
time.sleep(1)
ir_pipeline.set_state(Gst.State.NULL)
print("IR stopped")



def pin_callback(channel):
global is_recording

if GPIO.input(channel):
print("Rising edge detected on pin", channel)
if not is_recording:
start_recording()
is_recording = True
else:
print("Falling edge detected on pin", channel)
if is_recording:
stop_recording()
is_recording = False

def main():
GPIO.setmode(GPIO.BCM) # Use BCM pin numbering
GPIO.setup(input_pin, GPIO.IN) # Set pin 24 as input

GPIO.add_event_detect(input_pin, GPIO.BOTH, callback=pin_callback, bouncetime=300)

print("Starting demo now! Press CTRL+C to exit")
try:
#GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, handle_keyboard_interrupt, loop)
loop.run()
except KeyboardInterrupt:
print("stopping pipelines...")
loop.quit()
print("Pipelines stopped.")
if __name__ == '__main__':
main()
</code></pre>
 
Top