OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

prioritize an action for a specific distance

  • Thread starter Thread starter Lara Cuevas
  • Start date Start date
L

Lara Cuevas

Guest
I am trying to write a program in Python in order to compare two camera frames. The idea is checking the distance at which the pixel stands to. Do any of you know how could I prioritize the section where:

Code:
       if 0.1 < d_security_d435 < 0.5 or 0.1 < d_security_l515 < 0.5:
                    cv2.circle(depth_colormap_d435, (cell_center_x_d435, cell_center_y_d435), 3, (0, 0, 255), -1)
                    cv2.circle(depth_colormap_d435, (cell_center_x_l515, cell_center_y_l515), 3, (0, 0, 255), -1)

                    print(f"COLLISION: {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")
                    write_to_file('0')
                    time.sleep(0.2)
                    write_to_file('')
                    alert_triggered = True
                    # priority_alert = True
                    break

I want that, if any pixel is at that distance, only write '0' on the txt file until there are no more pixels at that range and then you can write if any pixels are at any other specified range.

My full code is as follows:

Code:
import pyrealsense2 as rs
import numpy as np
import cv2
import time
import pygame
import os
from concurrent.futures import ThreadPoolExecutor

event_occurred = False
file_name = 'distancias.txt'

def initialize_file():
    with open(file_name, 'w') as file:
        file.write(f'\n')

def write_to_file(message):
    with open(file_name, 'w') as file:
        file.write(f'{message}\n')

def stop_measurements(pipeline_l515=None, pipeline_d435=None):
    write_to_file(f'p\n')
    if pipeline_l515:
        pipeline_l515.stop()
    if pipeline_d435:
        pipeline_d435.stop()
    cv2.destroyAllWindows()
    os._exit(0)

def play_beep():
    pygame.init()
    alarm_sound_path = "C:/Users/Lara/Desktop/TFG/pending-notification.wav"
    pygame.mixer.music.load(alarm_sound_path)
    pygame.mixer.music.play()
    pygame.time.wait(1000)
    pygame.mixer.music.stop()
    pygame.quit()

def configure_camera_l515(config):
    config.enable_stream(rs.stream.depth, 1024, 768, rs.format.z16, 30)

def configure_camera_d435(config):
    config.enable_stream(rs.stream.depth, 848, 480, rs.format.z16, 30)

def check_d435_connected():
    ctx = rs.context()
    devices = ctx.query_devices()
    d435_found = False
    
    for dev in devices:
        if 'D435' in dev.get_info(rs.camera_info.name):
            print("Intel RealSense D435 found:", dev.get_info(rs.camera_info.name))
            d435_found = True
    
    if not d435_found:
        print("No Intel RealSense D435 found.")
        return False
    return True

def check_l515_connected():
    ctx = rs.context()
    devices = ctx.query_devices()
    l515_found = False
    
    for dev in devices:
        if 'L515' in dev.get_info(rs.camera_info.name):
            print("Intel RealSense L515 found:", dev.get_info(rs.camera_info.name))
            l515_found = True
    
    if not l515_found:
        print("No Intel RealSense L515 found.")
        return False
    return True       

def get_central_pixel_coordinates(width, height, cell_x, cell_y, cell_width, cell_height):
    cell_center_x = ((cell_x * cell_width) + (cell_width // 2))
    cell_center_y = ((cell_y * cell_height) + (cell_height // 2))
    return cell_center_x, cell_center_y

def calculate_d_security(dS, dMi, dC, alpha_deg, beta_deg, pixel_yi, pixel_y, pixel_ymax_2):
    alpha = np.radians(alpha_deg)
    beta = np.radians(beta_deg)
    cos_alpha = np.cos(alpha)
    sin_alpha = np.sin(alpha)
    tan_beta = np.tan(beta)
    
    d1 = (dMi * cos_alpha) + (pixel_yi - pixel_y) * (dC * tan_beta / (pixel_ymax_2)) * sin_alpha

    d_security = d1 - dS
    return d_security

def process_frames(pipeline_d435, pipeline_l515, width_d435, height_d435, width_l515, height_l515):
    alert_triggered = False
    frames_d435 = pipeline_d435.wait_for_frames()
    frames_l515 = pipeline_l515.wait_for_frames()
    depth_frame_d435 = frames_d435.get_depth_frame()
    depth_frame_l515 = frames_l515.get_depth_frame()
    cell_width_d435, cell_height_d435 = width_d435 // 28, height_d435 // 28
    cell_width_l515, cell_height_l515 = width_l515 // 28, height_l515 // 28

    depth_colormap_d435 = cv2.applyColorMap(cv2.convertScaleAbs(np.asanyarray(depth_frame_d435.get_data()), alpha=0.03), cv2.COLORMAP_JET)
    depth_colormap_l515 = cv2.applyColorMap(cv2.convertScaleAbs(np.asanyarray(depth_frame_l515.get_data()), alpha=0.03), cv2.COLORMAP_JET)

    for cell_x in range(7,23):
        for cell_y in range(5,28):
            cell_center_x_d435, cell_center_y_d435 = get_central_pixel_coordinates(width_d435, height_d435, cell_x, cell_y, cell_width_d435, cell_height_d435)
            distance_d435_value = depth_frame_d435.get_distance(cell_center_x_d435, cell_center_y_d435)
            
            # Process frames from L515 camera
            cell_center_x_l515, cell_center_y_l515 = get_central_pixel_coordinates(width_l515, height_l515, cell_x, cell_y, cell_width_l515, cell_height_l515)
            distance_l515_value = depth_frame_l515.get_distance(cell_center_x_l515, cell_center_y_l515) if depth_frame_l515 else None

            # cv2.circle(depth_colormap_d435, (cell_center_x_d435, cell_center_y_d435), 3, (255, 0, 0), -1)
            # cv2.circle(depth_colormap_l515, (cell_center_x_l515, cell_center_y_l515), 3, (255, 0, 0), -1)

            # Use distance from D435 if L515 distance is None
            d_security_d435 = calculate_d_security(dMi=distance_d435_value, dS=1.35, dC=distance_d435_value, alpha_deg=25, beta_deg=29, pixel_yi=cell_center_y_d435, pixel_y=height_d435 // 2, pixel_ymax_2=240)
            d_security_l515 = calculate_d_security(dMi=distance_l515_value, dS=1.35, dC=distance_l515_value, alpha_deg=25, beta_deg=22.5, pixel_yi=cell_center_y_l515, pixel_y=384, pixel_ymax_2=384)

            if distance_l515_value is None:
                d_security_l515 = d_security_d435

            # Check if the distances are consistent within 0.1 meters
            if abs(d_security_d435 - d_security_l515) <= 0.1:
                if 0.1 < d_security_d435 < 0.5 or 0.1 < d_security_l515 < 0.5:
                    cv2.circle(depth_colormap_d435, (cell_center_x_d435, cell_center_y_d435), 3, (0, 0, 255), -1)
                    cv2.circle(depth_colormap_d435, (cell_center_x_l515, cell_center_y_l515), 3, (0, 0, 255), -1)

                    print(f"COLLISION: {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")
                    write_to_file('0')
                    time.sleep(0.2)
                    write_to_file('')
                    alert_triggered = True
                    # priority_alert = True
                    break

                elif 0.51 < d_security_d435 < 1.0 or 0.51 < d_security_l515 < 1.0:
                    # print(f"ALERT: Object at a distance of {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")
                   
                    write_to_file('6')
                    time.sleep(0.2)
                    write_to_file('')
                    alert_triggered = True
                    break

                elif 1.01 < d_security_d435 < 1.5 or 1.01 < d_security_l515 < 1.5:
                    # print(f"WARNING: Object at a distance of {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")
                    play_beep()
                    alert_triggered = True
                    break

        if alert_triggered:
            break
    return depth_colormap_d435, depth_colormap_l515

def main():
    initialize_file()

    # Configure RealSense pipelines
    pipeline_d435 = rs.pipeline()
    config_d435 = rs.config()
    configure_camera_d435(config_d435)
    pipeline_d435.start(config_d435)

    pipeline_l515 = rs.pipeline()
    config_l515 = rs.config()
    configure_camera_l515(config_l515)
    pipeline_l515.start(config_l515)

    width_d435, height_d435 = 848, 480 
    width_l515, height_l515 = 1024, 768 

    # Executor to run process_frames concurrently
    executor = ThreadPoolExecutor(max_workers=2)
    future = executor.submit(process_frames, pipeline_d435, pipeline_l515, width_d435, height_d435, width_l515, height_l515)

    while True:
        depth_colormap_d435, depth_colormap_l515 = future.result()

        # Display depth_colormap_d435 using cv2.imshow
        cv2.imshow('RealSense D435 Depth Map', depth_colormap_d435)
        # cv2.imshow('RealSense L515 Depth Map', depth_colormap_l515)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        # Reset future for next frame processing
        future = executor.submit(process_frames, pipeline_d435, pipeline_l515, width_d435, height_d435, width_l515, height_l515)

    # Stop the pipelines and close all windows
    stop_measurements(pipeline_l515, pipeline_d435)

if __name__ == "__main__":
    main()

I have tried putting a priority_flag = True for when it is on that section and then doing elif not priority_flag and 0.51 < d_security_d435 < 1.0 or 0.51 < d_security_l515 < 1.0: .....

but nothing seems to be working
<p>I am trying to write a program in Python in order to compare two camera frames. The idea is checking the distance at which the pixel stands to. Do any of you know how could I prioritize the section where:</p>
<pre><code> if 0.1 < d_security_d435 < 0.5 or 0.1 < d_security_l515 < 0.5:
cv2.circle(depth_colormap_d435, (cell_center_x_d435, cell_center_y_d435), 3, (0, 0, 255), -1)
cv2.circle(depth_colormap_d435, (cell_center_x_l515, cell_center_y_l515), 3, (0, 0, 255), -1)

print(f"COLLISION: {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")
write_to_file('0')
time.sleep(0.2)
write_to_file('')
alert_triggered = True
# priority_alert = True
break
</code></pre>
<p>I want that, if any pixel is at that distance, only write '0' on the txt file until there are no more pixels at that range and then you can write if any pixels are at any other specified range.</p>
<p>My full code is as follows:</p>
<pre><code>
import pyrealsense2 as rs
import numpy as np
import cv2
import time
import pygame
import os
from concurrent.futures import ThreadPoolExecutor

event_occurred = False
file_name = 'distancias.txt'

def initialize_file():
with open(file_name, 'w') as file:
file.write(f'\n')

def write_to_file(message):
with open(file_name, 'w') as file:
file.write(f'{message}\n')

def stop_measurements(pipeline_l515=None, pipeline_d435=None):
write_to_file(f'p\n')
if pipeline_l515:
pipeline_l515.stop()
if pipeline_d435:
pipeline_d435.stop()
cv2.destroyAllWindows()
os._exit(0)

def play_beep():
pygame.init()
alarm_sound_path = "C:/Users/Lara/Desktop/TFG/pending-notification.wav"
pygame.mixer.music.load(alarm_sound_path)
pygame.mixer.music.play()
pygame.time.wait(1000)
pygame.mixer.music.stop()
pygame.quit()

def configure_camera_l515(config):
config.enable_stream(rs.stream.depth, 1024, 768, rs.format.z16, 30)

def configure_camera_d435(config):
config.enable_stream(rs.stream.depth, 848, 480, rs.format.z16, 30)

def check_d435_connected():
ctx = rs.context()
devices = ctx.query_devices()
d435_found = False

for dev in devices:
if 'D435' in dev.get_info(rs.camera_info.name):
print("Intel RealSense D435 found:", dev.get_info(rs.camera_info.name))
d435_found = True

if not d435_found:
print("No Intel RealSense D435 found.")
return False
return True

def check_l515_connected():
ctx = rs.context()
devices = ctx.query_devices()
l515_found = False

for dev in devices:
if 'L515' in dev.get_info(rs.camera_info.name):
print("Intel RealSense L515 found:", dev.get_info(rs.camera_info.name))
l515_found = True

if not l515_found:
print("No Intel RealSense L515 found.")
return False
return True

def get_central_pixel_coordinates(width, height, cell_x, cell_y, cell_width, cell_height):
cell_center_x = ((cell_x * cell_width) + (cell_width // 2))
cell_center_y = ((cell_y * cell_height) + (cell_height // 2))
return cell_center_x, cell_center_y

def calculate_d_security(dS, dMi, dC, alpha_deg, beta_deg, pixel_yi, pixel_y, pixel_ymax_2):
alpha = np.radians(alpha_deg)
beta = np.radians(beta_deg)
cos_alpha = np.cos(alpha)
sin_alpha = np.sin(alpha)
tan_beta = np.tan(beta)

d1 = (dMi * cos_alpha) + (pixel_yi - pixel_y) * (dC * tan_beta / (pixel_ymax_2)) * sin_alpha

d_security = d1 - dS
return d_security

def process_frames(pipeline_d435, pipeline_l515, width_d435, height_d435, width_l515, height_l515):
alert_triggered = False
frames_d435 = pipeline_d435.wait_for_frames()
frames_l515 = pipeline_l515.wait_for_frames()
depth_frame_d435 = frames_d435.get_depth_frame()
depth_frame_l515 = frames_l515.get_depth_frame()
cell_width_d435, cell_height_d435 = width_d435 // 28, height_d435 // 28
cell_width_l515, cell_height_l515 = width_l515 // 28, height_l515 // 28

depth_colormap_d435 = cv2.applyColorMap(cv2.convertScaleAbs(np.asanyarray(depth_frame_d435.get_data()), alpha=0.03), cv2.COLORMAP_JET)
depth_colormap_l515 = cv2.applyColorMap(cv2.convertScaleAbs(np.asanyarray(depth_frame_l515.get_data()), alpha=0.03), cv2.COLORMAP_JET)

for cell_x in range(7,23):
for cell_y in range(5,28):
cell_center_x_d435, cell_center_y_d435 = get_central_pixel_coordinates(width_d435, height_d435, cell_x, cell_y, cell_width_d435, cell_height_d435)
distance_d435_value = depth_frame_d435.get_distance(cell_center_x_d435, cell_center_y_d435)

# Process frames from L515 camera
cell_center_x_l515, cell_center_y_l515 = get_central_pixel_coordinates(width_l515, height_l515, cell_x, cell_y, cell_width_l515, cell_height_l515)
distance_l515_value = depth_frame_l515.get_distance(cell_center_x_l515, cell_center_y_l515) if depth_frame_l515 else None

# cv2.circle(depth_colormap_d435, (cell_center_x_d435, cell_center_y_d435), 3, (255, 0, 0), -1)
# cv2.circle(depth_colormap_l515, (cell_center_x_l515, cell_center_y_l515), 3, (255, 0, 0), -1)

# Use distance from D435 if L515 distance is None
d_security_d435 = calculate_d_security(dMi=distance_d435_value, dS=1.35, dC=distance_d435_value, alpha_deg=25, beta_deg=29, pixel_yi=cell_center_y_d435, pixel_y=height_d435 // 2, pixel_ymax_2=240)
d_security_l515 = calculate_d_security(dMi=distance_l515_value, dS=1.35, dC=distance_l515_value, alpha_deg=25, beta_deg=22.5, pixel_yi=cell_center_y_l515, pixel_y=384, pixel_ymax_2=384)

if distance_l515_value is None:
d_security_l515 = d_security_d435

# Check if the distances are consistent within 0.1 meters
if abs(d_security_d435 - d_security_l515) <= 0.1:
if 0.1 < d_security_d435 < 0.5 or 0.1 < d_security_l515 < 0.5:
cv2.circle(depth_colormap_d435, (cell_center_x_d435, cell_center_y_d435), 3, (0, 0, 255), -1)
cv2.circle(depth_colormap_d435, (cell_center_x_l515, cell_center_y_l515), 3, (0, 0, 255), -1)

print(f"COLLISION: {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")
write_to_file('0')
time.sleep(0.2)
write_to_file('')
alert_triggered = True
# priority_alert = True
break

elif 0.51 < d_security_d435 < 1.0 or 0.51 < d_security_l515 < 1.0:
# print(f"ALERT: Object at a distance of {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")

write_to_file('6')
time.sleep(0.2)
write_to_file('')
alert_triggered = True
break

elif 1.01 < d_security_d435 < 1.5 or 1.01 < d_security_l515 < 1.5:
# print(f"WARNING: Object at a distance of {d_security_d435:.2f} meters away at pixel ({cell_center_x_d435}, {cell_center_y_d435})")
play_beep()
alert_triggered = True
break

if alert_triggered:
break
return depth_colormap_d435, depth_colormap_l515

def main():
initialize_file()

# Configure RealSense pipelines
pipeline_d435 = rs.pipeline()
config_d435 = rs.config()
configure_camera_d435(config_d435)
pipeline_d435.start(config_d435)

pipeline_l515 = rs.pipeline()
config_l515 = rs.config()
configure_camera_l515(config_l515)
pipeline_l515.start(config_l515)

width_d435, height_d435 = 848, 480
width_l515, height_l515 = 1024, 768

# Executor to run process_frames concurrently
executor = ThreadPoolExecutor(max_workers=2)
future = executor.submit(process_frames, pipeline_d435, pipeline_l515, width_d435, height_d435, width_l515, height_l515)

while True:
depth_colormap_d435, depth_colormap_l515 = future.result()

# Display depth_colormap_d435 using cv2.imshow
cv2.imshow('RealSense D435 Depth Map', depth_colormap_d435)
# cv2.imshow('RealSense L515 Depth Map', depth_colormap_l515)

if cv2.waitKey(1) & 0xFF == ord('q'):
break

# Reset future for next frame processing
future = executor.submit(process_frames, pipeline_d435, pipeline_l515, width_d435, height_d435, width_l515, height_l515)

# Stop the pipelines and close all windows
stop_measurements(pipeline_l515, pipeline_d435)

if __name__ == "__main__":
main()

</code></pre>
<p>I have tried putting a priority_flag = True for when it is on that section and then doing elif not priority_flag and 0.51 < d_security_d435 < 1.0 or 0.51 < d_security_l515 < 1.0: .....</p>
<p>but nothing seems to be working</p>
 

Latest posts

Top