in Linux, Tutorials

Services design: shared memory

We are going to extend our previous service with a new feature, a shared memory handler. The service will be able to shared bigger blocks of data than a queue message.

Service IPC: POSIX message queue
Service IPC: POSIX shared memory

In the previous part of this series we have seen how message queue can be used to communicate two services, and transfer an small block of data. The POSIX queue has a default message size of 8192 bytes. We can modify the message size in the creation of the queue, but if we need to deal with much bigger sizes is better to use other resource to store the payload: shared memory

With this resource we can reserve a zone of memory that will be accessed by more of one process. We can store the request and response payload in this zone and use a synchronization system to handle their write and read.

For the synchronization method we are going to reuse the previous message queues.

services_shared_memory_diagram

We can start with the initialization of the shared memory handler. Similar to the previous part we need to open and existing resource or create it, so we need the flags O_CREAT and O_RDWR. With some functions we can add the initial size reserved for this shared memory segment, but also we can define this size before the first io operation.

self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=4096)
if self.sm_segment is None:
    log(f"Error opening shared memory")

For the best performance we can use multiples of the page size of the system. We need to calculate how many pages we need to fit the data in and resize our mapped memory file to contain that number of pages.

def calculate_best_size(minimal_size: int) -> int:
    return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE

To be able to use the memory in our program we need to map it to a file in the system. We can do it using mmap in Unix platform, one of the parameters of this call is the file descriptor obtained during the shared memory segment initialization. Each time that we need to resize our memory segment we need call mmap to create a new mapping of the memory. Write and read from this memory is similar to use a file.

The service A must know if the memory has been resized by the other service B to remap their file. The easiest way is to send the size of the memory when the write is done to the other service, the other service receive the message and extract the size, comparing it with their current memory segment, if the size is different the other service has resized the shared memory and a new mapping must be done.

When the shared memory is no longer needed we can munmap the mapped file, close the file descriptor of the segment and the last service must unlink.

sm_handler.py

from posix_ipc import SharedMemory, O_CREAT, O_RDWR
import mmap

from pcube.common.enums import EExitCode
from pcube.common.logger import log

class SMHandler:
    INITIAL_SIZE = 512

    def __init__(self):
        self.sm_segment = None
        self.map_file = None
        self.mapped_size = 0

    @staticmethod
    def calculate_best_size(minimal_size: int) -> int:
        return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE
    
    def update_map(self) -> EExitCode:
        if (self.sm_segment is not None and self.mapped_size < self.sm_segment.size):
            try:
                self.map_file = mmap.mmap(self.sm_segment.fd, self.sm_segment.size)
                self.mapped_size = self.sm_segment.size
                log(f"Shared memory update map {self.map_file.size()} bytes")
            except Exception:
                return EExitCode.FAIL
        return EExitCode.SUCCESS

    def resize(self, size: int=None) -> EExitCode:
        optimal_size = SMHandler.calculate_best_size(size)
        log(f"Shared memory segment resize {optimal_size} bytes")
        self.map_file.resize(optimal_size)
        return EExitCode.SUCCESS

    def connect(self, name: str) -> EExitCode:
        optimal_size = SMHandler.calculate_best_size(SMHandler.INITIAL_SIZE)
        self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=optimal_size)
        if self.sm_segment is None:
            log(f"Error opening shared memory")
            return EExitCode.FAIL
        return self.update_map()

    def disconnect(self, unlink: bool) -> EExitCode:
        if self.map_file is not None:
            try:
                self.map_file.close()
            except Exception as ex:
                log(f"Error map file close {ex}")
                return EExitCode.FAIL
            self.map_file = None

        if self.sm_segment is not None:
            try:
                self.sm_segment.close_fd()
                if unlink:
                    self.sm_segment.unlink()
            except Exception as ex:
                log(f"Error sm segment close {ex}")
                return EExitCode.FAIL
            self.sm_segment = None
        return EExitCode.SUCCESS

    def write(self, payload: str) -> EExitCode:
        payload_size = len(payload)
        log(f"Shared memory write '{payload}' {payload_size} bytes")
        exit_code = EExitCode.SUCCESS
        if self.sm_segment is None or self.sm_segment.size != SMHandler.calculate_best_size(payload_size):
            self.resize(payload_size)
            exit_code = self.update_map()
        
        if exit_code == EExitCode.SUCCESS:
            try:
                self.map_file.seek(0)
                _ = self.map_file.write(payload.encode())
                return exit_code
            except Exception as ex:
                log(f"Error sm write {ex}")
        return EExitCode.FAIL
    
    def read(self, payload_size: int) -> tuple[str, EExitCode]:
        if self.update_map() == EExitCode.SUCCESS:
            try:
                self.map_file.seek(0)
                payload = self.map_file.read(payload_size).decode()
                log(f"Shared memory read '{payload}' {payload_size} bytes")
                return payload, EExitCode.SUCCESS
            except Exception as ex:
                log(f"Error sm read {ex}")
        return None, EExitCode.FAIL

cpp and rust versions: Github: T-Services

We can run a host and a worker to see the interaction of both services using Shared memory and POSIX message queues.

service_posix_shared_mem_com

Conclusion

With this code we have finished the second part of a service program, the data exchange using shared memory and their communication with other services. We have use a message queue but other primitives like pipes, semaphores can be using to do the synchronization.

Tutorial files

Support this blog!

For the past year we have been dedicating more of our time to the creation of tutorials, mainly about game development. If you think these posts have either helped or inspired you, please consider supporting this blog. Thank you so much for your contribution!

Write a Comment

Comment