We are going to extend our previous service with a new feature, a shared memory handler. The service will be able to shared bigger blocks of data than a queue message.
Service IPC: POSIX message queue
Service IPC: POSIX shared memory
In the previous part of this series we have seen how message queue can be used to communicate two services, and transfer an small block of data. The POSIX queue has a default message size of 8192 bytes. We can modify the message size in the creation of the queue, but if we need to deal with much bigger sizes is better to use other resource to store the payload: shared memory
With this resource we can reserve a zone of memory that will be accessed by more of one process. We can store the request and response payload in this zone and use a synchronization system to handle their write and read.
For the synchronization method we are going to reuse the previous message queues.
We can start with the initialization of the shared memory handler. Similar to the previous part we need to open and existing resource or create it, so we need the flags O_CREAT and O_RDWR. With some functions we can add the initial size reserved for this shared memory segment, but also we can define this size before the first io operation.
self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=4096) if self.sm_segment is None: log(f"Error opening shared memory")
For the best performance we can use multiples of the page size of the system. We need to calculate how many pages we need to fit the data in and resize our mapped memory file to contain that number of pages.
def calculate_best_size(minimal_size: int) -> int: return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE
To be able to use the memory in our program we need to map it to a file in the system. We can do it using mmap in Unix platform, one of the parameters of this call is the file descriptor obtained during the shared memory segment initialization. Each time that we need to resize our memory segment we need call mmap to create a new mapping of the memory. Write and read from this memory is similar to use a file.
The service A must know if the memory has been resized by the other service B to remap their file. The easiest way is to send the size of the memory when the write is done to the other service, the other service receive the message and extract the size, comparing it with their current memory segment, if the size is different the other service has resized the shared memory and a new mapping must be done.
When the shared memory is no longer needed we can munmap the mapped file, close the file descriptor of the segment and the last service must unlink.
sm_handler.py
from posix_ipc import SharedMemory, O_CREAT, O_RDWR import mmap from pcube.common.enums import EExitCode from pcube.common.logger import log class SMHandler: INITIAL_SIZE = 512 def __init__(self): self.sm_segment = None self.map_file = None self.mapped_size = 0 @staticmethod def calculate_best_size(minimal_size: int) -> int: return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE def update_map(self) -> EExitCode: if (self.sm_segment is not None and self.mapped_size < self.sm_segment.size): try: self.map_file = mmap.mmap(self.sm_segment.fd, self.sm_segment.size) self.mapped_size = self.sm_segment.size log(f"Shared memory update map {self.map_file.size()} bytes") except Exception: return EExitCode.FAIL return EExitCode.SUCCESS def resize(self, size: int=None) -> EExitCode: optimal_size = SMHandler.calculate_best_size(size) log(f"Shared memory segment resize {optimal_size} bytes") self.map_file.resize(optimal_size) return EExitCode.SUCCESS def connect(self, name: str) -> EExitCode: optimal_size = SMHandler.calculate_best_size(SMHandler.INITIAL_SIZE) self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=optimal_size) if self.sm_segment is None: log(f"Error opening shared memory") return EExitCode.FAIL return self.update_map() def disconnect(self, unlink: bool) -> EExitCode: if self.map_file is not None: try: self.map_file.close() except Exception as ex: log(f"Error map file close {ex}") return EExitCode.FAIL self.map_file = None if self.sm_segment is not None: try: self.sm_segment.close_fd() if unlink: self.sm_segment.unlink() except Exception as ex: log(f"Error sm segment close {ex}") return EExitCode.FAIL self.sm_segment = None return EExitCode.SUCCESS def write(self, payload: str) -> EExitCode: payload_size = len(payload) log(f"Shared memory write '{payload}' {payload_size} bytes") exit_code = EExitCode.SUCCESS if self.sm_segment is None or self.sm_segment.size != SMHandler.calculate_best_size(payload_size): self.resize(payload_size) exit_code = self.update_map() if exit_code == EExitCode.SUCCESS: try: self.map_file.seek(0) _ = self.map_file.write(payload.encode()) return exit_code except Exception as ex: log(f"Error sm write {ex}") return EExitCode.FAIL def read(self, payload_size: int) -> tuple[str, EExitCode]: if self.update_map() == EExitCode.SUCCESS: try: self.map_file.seek(0) payload = self.map_file.read(payload_size).decode() log(f"Shared memory read '{payload}' {payload_size} bytes") return payload, EExitCode.SUCCESS except Exception as ex: log(f"Error sm read {ex}") return None, EExitCode.FAIL
cpp and rust versions: Github: T-Services
We can run a host and a worker to see the interaction of both services using Shared memory and POSIX message queues.
Conclusion
With this code we have finished the second part of a service program, the data exchange using shared memory and their communication with other services. We have use a message queue but other primitives like pipes, semaphores can be using to do the synchronization.
Tutorial files
You may also like:
Support this blog!
For the past year we have been dedicating more of our time to the creation of tutorials, mainly about game development. If you think these posts have either helped or inspired you, please consider supporting this blog. Thank you so much for your contribution!