OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Synchrinization of multiple rtsp camera streams using Valkka(Python media streaming lib)

  • Thread starter Thread starter Kamoliddin
  • Start date Start date
K

Kamoliddin

Guest
First of all, it is my first question and apologize for any kind of misunderstanding and errors.

I'm not quiet good at multiprocessing/multihreading, but I'm trying my best.

Currently, I'm working on streaming/consuming frames from multiple rtsp cameras.

With other approaches like FFMPEG, Opencv etc. necessary performance cannot be reached for now, so, I found an amazing library for real-time data streaming, called Valkka. I managed 20 FPS in the worst case with this tool. Consuming frames should be saved to storage and also frames' paths need to be written to Redis, in the following format:

Code:
{
   camera1: /path/to/camera1/frames.jpg,
   camera2: /path/to/camera2/frames.jpg,
   ....
   ....
   cameraN: /path/to/cameraN/frames.jpg,
}

Here is a problem popping up, in order to save frames' paths to Redis cameras should be synchronized.

Here is a code I tried to solve the problem using this suggestion from the author of the Valkka lib :

Code:
import time
from valkka.multiprocess import safe_select
from basic.skeleton.multiprocess.rgb import RGB24Process
from basic.skeleton.singleton import event_fd_group_1


class MyProcess(RGB24Process):

    def preRun__(self):
        super().preRun__()
        from redis import Redis
        self.redis_instance = Redis(host="localhost", port=6379, db=1)

    def readPipes__(self, timeout):
        rlis = [self.back_pipe]
        print("RLIS:    ", rlis)
        # self.back_pipe is the intercom pipe with the main python process
        # listen to all rgb frame sources
        print("CLIENT_BY_FD:    ", self.client_by_fd.values())

        frame_fds = list(self.client_by_fd.keys())
        rlis += frame_fds

        rs, ws, es = safe_select(rlis, [], [], timeout=timeout)
        print("RS:          ", rs)
        print("FRAME_FDS:   ", frame_fds)

        for fd in rs:
            print("FD:      ", fd)
            if fd == self.back_pipe:
                self.handleBackPipe__(self.back_pipe)
            # There is no same item in frame_fds and rs arrays
            if fd in frame_fds:
                client = self.client_by_fd[fd]
                index, meta = client.pullFrame()
                if (index == None):
                    print("RGB24Process: handleFrame__ : weird.. rgb client got none")
                else:
                    print("Not coming to here")
                    data = client.shmem_list[index][0:meta.size]
                    data = data.reshape((meta.height, meta.width, 3))
                    self.handleFrame__(data, meta)

    def handleFrame__(self, frame, meta):
        self.logger.debug("handleFrame__ : rgb client got frame % from slot %s", frame.shape, meta.slot)
        """metadata has the following members:
        size 
        width
        height
        slot
        mstimestamp
        """
        import cv2
        import json
        img = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        image = f"/home/kamol/projects/valkka_sample/frames/{cam_id}/{meta.mstimestamp}.jpg"
        cv2.imwrite(image, img)

        # This code block somehow skipping several frames, which was saved to storage already but had not been push
        self.frames_collection[cam_id] = image
        if len(self.frames_collection) == 10:
            self.redis_client.rpush("camerasynchronizerviavalkka", json.dumps(self.frames_collection))
            self.frames_collection.clear()


class LiveStream:

    def __init__(self, shmem_buffers, shmem_name, address, slot, width, height):
        from valkka.core import AVThread, RGBShmemFrameFilter, SwScaleFrameFilter, LiveConnectionContext, \
            LiveConnectionType_rtsp
        self.shmem_buffers = shmem_buffers
        self.shmem_name = shmem_name
        self.address = f"rtsp://user:password@{address}:554"
        self.slot = slot
        self.width = width
        self.height = height

        _, self.event = event_fd_group_1.reserve()

        self.shmem_filter = RGBShmemFrameFilter(self.shmem_name, self.shmem_buffers, self.width, self.height)
        self.shmem_filter.useFd(self.event)

        # SWS Filter
        self.sws_filter = SwScaleFrameFilter(f"sws_{self.shmem_name}", width, height, self.shmem_filter)
        # self.interval_filter = TimeIntervalFrameFilter("interval_filter", 0, self.sws_filter)

        # decoding part
        self.avthread = AVThread("avthread", self.sws_filter)
        # self.av_in_filter = self.avthread.getBlockingFrameFilter()
        self.av_in_filter = self.avthread.getFrameFilter()

        # define connection to camera
        self.ctx = LiveConnectionContext(LiveConnectionType_rtsp, self.address, self.slot, self.av_in_filter)

        self.avthread.startCall()
        self.avthread.decodingOnCall()

    def close(self):
        self.avthread.decodingOffCall()
        self.avthread.stopCall()
        self.event.release()


p = MyProcess()
p.start()

from valkka.core import LiveThread
from config import get_configs

conf = get_configs()
cameras = conf.get("cameras", [0])
width = conf.get("image_width", 1920)
height = conf.get("image_height", 1080)
shmem_buffers = conf.get("shared_memory_buffers", 10)
mstimeout = conf.get("mstimeout", 10)

shmem_names = []
livestreams = {}
livethread = LiveThread("Livethread")
for i, cam in enumerate(cameras, start=1):
    livestreams[i] = LiveStream(
        shmem_buffers=shmem_buffers,
        shmem_name=f"{cam}",
        address=cam,
        slot=i,
        width=width,
        height=height
    )
    print("LIVETHREAD SHMEM: ", livestreams[i].shmem_name)

livethread.startCall()

for livestream in livestreams.values():
    livethread.registerStreamCall(livestream.ctx)
    livethread.playStreamCall(livestream.ctx)
    shmem_names.append(livestream.shmem_name)
    print("LIVESTREAM SHMEM:    ", livestream.shmem_name)
    fd = p.activateRGB24Client(
        name=livestream.shmem_name,
        n_ringbuffer=100,
        width=1920,
        height=1080,
        ipc_index=event_fd_group_1.asIndex(livestream.event)
    )

time.sleep(20)
p.stop()

Thanks from beforehand for any kind of suggestions.

EDIT​


In handleFrame__ I'm trying to do 2 things:

  • save frames to memory
  • push saved frames to Redis

Problems​

  • FPS dropping to 10 when FullHD resolution frames encoding, but it is stably giving 20 fps when the resolution is decreased to HD or lower.
  • saved and pushed to redis frames are making difference. Not all frames are being pushed by the order of saving to memory. Some frames are being skipped when pushed to redis. For example:

frames in the dirs:

Code:
cam1                cam2               cam3
    --img1.jpg          --img1.jpg         --img1.jpg
    --img2.jpg          --img2.jpg         --img2.jpg
    ...                 ...                ...
    --imgN.jpg          --imgN.jpg         --imgN.jpg


objects in redis array:
1 -> {
         cam1: "path/to/cam1/img1.jpg",
         cam3: "path/to/cam3/img3.jpg",
         cam2: "path/to/cam2/img5.jpg", 
     }
2 -> {
         cam3: "path/to/cam1/img2.jpg",
         cam1: "path/to/cam1/img8.jpg",
         cam2: "path/to/cam1/img9.jpg",
     }
<p>First of all, it is my first question and apologize for any kind of misunderstanding and errors.</p>
<p>I'm not quiet good at multiprocessing/multihreading, but I'm trying my best.</p>
<p>Currently, I'm working on streaming/consuming frames from multiple rtsp cameras.</p>
<p>With other approaches like FFMPEG, Opencv etc. necessary performance cannot be reached for now, so,
I found an amazing library for real-time data streaming, called <a href="https://github.com/elsampsa/valkka-core" rel="nofollow noreferrer">Valkka</a>. I managed 20 FPS in the worst case with this tool. Consuming frames should be saved to storage and also frames' paths need to be written to Redis, in the following format:</p>
<pre><code>{
camera1: /path/to/camera1/frames.jpg,
camera2: /path/to/camera2/frames.jpg,
....
....
cameraN: /path/to/cameraN/frames.jpg,
}
</code></pre>
<p>Here is a problem popping up, in order to save frames' paths to Redis cameras should be synchronized.</p>
<p>Here is a code I tried to solve the problem using <a href="https://github.com/elsampsa/valkka-multiprocess/issues/1" rel="nofollow noreferrer">this suggestion</a> from the author of the Valkka lib :</p>
<pre><code>import time
from valkka.multiprocess import safe_select
from basic.skeleton.multiprocess.rgb import RGB24Process
from basic.skeleton.singleton import event_fd_group_1


class MyProcess(RGB24Process):

def preRun__(self):
super().preRun__()
from redis import Redis
self.redis_instance = Redis(host="localhost", port=6379, db=1)

def readPipes__(self, timeout):
rlis = [self.back_pipe]
print("RLIS: ", rlis)
# self.back_pipe is the intercom pipe with the main python process
# listen to all rgb frame sources
print("CLIENT_BY_FD: ", self.client_by_fd.values())

frame_fds = list(self.client_by_fd.keys())
rlis += frame_fds

rs, ws, es = safe_select(rlis, [], [], timeout=timeout)
print("RS: ", rs)
print("FRAME_FDS: ", frame_fds)

for fd in rs:
print("FD: ", fd)
if fd == self.back_pipe:
self.handleBackPipe__(self.back_pipe)
# There is no same item in frame_fds and rs arrays
if fd in frame_fds:
client = self.client_by_fd[fd]
index, meta = client.pullFrame()
if (index == None):
print("RGB24Process: handleFrame__ : weird.. rgb client got none")
else:
print("Not coming to here")
data = client.shmem_list[index][0:meta.size]
data = data.reshape((meta.height, meta.width, 3))
self.handleFrame__(data, meta)

def handleFrame__(self, frame, meta):
self.logger.debug("handleFrame__ : rgb client got frame % from slot %s", frame.shape, meta.slot)
"""metadata has the following members:
size
width
height
slot
mstimestamp
"""
import cv2
import json
img = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
image = f"/home/kamol/projects/valkka_sample/frames/{cam_id}/{meta.mstimestamp}.jpg"
cv2.imwrite(image, img)

# This code block somehow skipping several frames, which was saved to storage already but had not been push
self.frames_collection[cam_id] = image
if len(self.frames_collection) == 10:
self.redis_client.rpush("camerasynchronizerviavalkka", json.dumps(self.frames_collection))
self.frames_collection.clear()


class LiveStream:

def __init__(self, shmem_buffers, shmem_name, address, slot, width, height):
from valkka.core import AVThread, RGBShmemFrameFilter, SwScaleFrameFilter, LiveConnectionContext, \
LiveConnectionType_rtsp
self.shmem_buffers = shmem_buffers
self.shmem_name = shmem_name
self.address = f"rtsp://user:password@{address}:554"
self.slot = slot
self.width = width
self.height = height

_, self.event = event_fd_group_1.reserve()

self.shmem_filter = RGBShmemFrameFilter(self.shmem_name, self.shmem_buffers, self.width, self.height)
self.shmem_filter.useFd(self.event)

# SWS Filter
self.sws_filter = SwScaleFrameFilter(f"sws_{self.shmem_name}", width, height, self.shmem_filter)
# self.interval_filter = TimeIntervalFrameFilter("interval_filter", 0, self.sws_filter)

# decoding part
self.avthread = AVThread("avthread", self.sws_filter)
# self.av_in_filter = self.avthread.getBlockingFrameFilter()
self.av_in_filter = self.avthread.getFrameFilter()

# define connection to camera
self.ctx = LiveConnectionContext(LiveConnectionType_rtsp, self.address, self.slot, self.av_in_filter)

self.avthread.startCall()
self.avthread.decodingOnCall()

def close(self):
self.avthread.decodingOffCall()
self.avthread.stopCall()
self.event.release()


p = MyProcess()
p.start()

from valkka.core import LiveThread
from config import get_configs

conf = get_configs()
cameras = conf.get("cameras", [0])
width = conf.get("image_width", 1920)
height = conf.get("image_height", 1080)
shmem_buffers = conf.get("shared_memory_buffers", 10)
mstimeout = conf.get("mstimeout", 10)

shmem_names = []
livestreams = {}
livethread = LiveThread("Livethread")
for i, cam in enumerate(cameras, start=1):
livestreams = LiveStream(
shmem_buffers=shmem_buffers,
shmem_name=f"{cam}",
address=cam,
slot=i,
width=width,
height=height
)
print("LIVETHREAD SHMEM: ", livestreams.shmem_name)

livethread.startCall()

for livestream in livestreams.values():
livethread.registerStreamCall(livestream.ctx)
livethread.playStreamCall(livestream.ctx)
shmem_names.append(livestream.shmem_name)
print("LIVESTREAM SHMEM: ", livestream.shmem_name)
fd = p.activateRGB24Client(
name=livestream.shmem_name,
n_ringbuffer=100,
width=1920,
height=1080,
ipc_index=event_fd_group_1.asIndex(livestream.event)
)

time.sleep(20)
p.stop()
</code></pre>
<p>Thanks from beforehand for any kind of suggestions.</p>
<h3>EDIT</h3>
<p>In <code>handleFrame__</code> I'm trying to do 2 things:</p>
<ul>
<li>save frames to memory</li>
<li>push saved frames to Redis</li>
</ul>
<h3>Problems</h3>
<ul>
<li>FPS dropping to 10 when FullHD resolution frames encoding, but it is stably giving 20 fps when the resolution is decreased to HD or lower.</li>
<li>saved and pushed to redis frames are making difference. Not all frames are being pushed by the order of saving to memory. Some frames are being skipped when pushed to redis.
For example:</li>
</ul>
<p>frames in the dirs:</p>
<pre><code>cam1 cam2 cam3
--img1.jpg --img1.jpg --img1.jpg
--img2.jpg --img2.jpg --img2.jpg
... ... ...
--imgN.jpg --imgN.jpg --imgN.jpg


objects in redis array:
1 -> {
cam1: "path/to/cam1/img1.jpg",
cam3: "path/to/cam3/img3.jpg",
cam2: "path/to/cam2/img5.jpg",
}
2 -> {
cam3: "path/to/cam1/img2.jpg",
cam1: "path/to/cam1/img8.jpg",
cam2: "path/to/cam1/img9.jpg",
}
</code></pre>
 
Top