Vamos a extender nuestro tutorial anterior con una nueva característica, un gestor de memoria compartida. Con ella nuestros servicios serán capaces de manejar bloques de datos más grandes que con una cola de mensajes.
IPC con colas de mensajes POSIX
IPC y memoria compartida
En una parte anterior de esta serie de tutoriales vimos como las colas de mensajes podían usarse para comunicar dos servicios e intercambiar pequeños bloques de información. Una cola POSIX tiene un tamaño máximo de mensaje de 8192 bytes. Podemos modificar este tamaño durante la creación de la cola, pero si necesitamos manejar un tamaño de mensaje muy grande es mejor utilizar otro tipo de recurso: memoria compartida
Con este recurso podemos reservar una zona de memoria a la que puede acceder más de un proceso. Almacenar tanto la petición como la respuesta en esta zona y añadir un sistema de sincronización para controlar su escritura y lectura.
Para la sincronización reutilizaremos las colas de mensajes.
Empecemos con la inicialización de nuestro gestor de memoria compartida. De manera similar a otros recursos tendremos que crearlo o abrirlo en el caso de que ya exista, para ello utilizaremos los flags O_CREAT y O_RDWR. Dependiendo de la implementación en algunas funciones se puede añadir y reservar el tamaño inicial del segmento de memoria compartida, pero tambien lo podemos definir más tarde, justo antes de realizar las operatciones de escritura o lectura.
self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=4096) if self.sm_segment is None: log(f"Error opening shared memory")
Para un mejor rendimiento utilizamos múltiplos del tamaño de página del sistema. Calcularemos cuantas páginas necesitamos para almacenar nuestros datos y reajustaremos el tamaño de nuestra memoria mapeada para contener ese número de páginas.
def calculate_best_size(minimal_size: int) -> int: return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE
Para poder utilizar esta memoria en nuestro programa necesitamos mapearla a un fichero dentro del sistema operativo. Para ello usaremos la función mmap, en caso de utilizar plataformas Unix, uno de sus parámetros es el descriptor de fichero obtenido al inicializar el segmento de memoria compartida. Cada vez que cambiemos el tamaño de nuestro segmento de memoria tendremos que volver a llamar mmap para crear un nuevo mapeo de la memoria. La escritura y lectura mediante este fichero es similar a realizarlas con un fichero almacenado en disco.
Un servicio A debe conocer si el tamaño de la memoria ha sido cambiada por el otro servicio B para remapear y actualizar el puntero a este fichero. La manera más facil es mandar el tamaño por la cola de mensajes cuando una escritura es realizada, el otro servicio recibe el mensaje y extrae el tamaño, comparandolo con el tamaño que tiene almacenado para su segmento, si el tamaño es diferente entonces el otro servicio ha cambiado el tamaño de la memoria compartida y debe actualizar sus variable con un nuevo mapeo
Cuando la memoria compartida deja de ser necesaria se puede liberar mediante munmap para liberar el mapeo del fichero, close para cerrar el descriptor de fichero del segmento y finalmente el último de los servicios deberá llamar a unlink para borrar el recurso
sm_handler.py
from posix_ipc import SharedMemory, O_CREAT, O_RDWR import mmap from pcube.common.enums import EExitCode from pcube.common.logger import log class SMHandler: INITIAL_SIZE = 512 def __init__(self): self.sm_segment = None self.map_file = None self.mapped_size = 0 @staticmethod def calculate_best_size(minimal_size: int) -> int: return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE def update_map(self) -> EExitCode: if (self.sm_segment is not None and self.mapped_size < self.sm_segment.size): try: self.map_file = mmap.mmap(self.sm_segment.fd, self.sm_segment.size) self.mapped_size = self.sm_segment.size log(f"Shared memory update map {self.map_file.size()} bytes") except Exception: return EExitCode.FAIL return EExitCode.SUCCESS def resize(self, size: int=None) -> EExitCode: optimal_size = SMHandler.calculate_best_size(size) log(f"Shared memory segment resize {optimal_size} bytes") self.map_file.resize(optimal_size) return EExitCode.SUCCESS def connect(self, name: str) -> EExitCode: optimal_size = SMHandler.calculate_best_size(SMHandler.INITIAL_SIZE) self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=optimal_size) if self.sm_segment is None: log(f"Error opening shared memory") return EExitCode.FAIL return self.update_map() def disconnect(self, unlink: bool) -> EExitCode: if self.map_file is not None: try: self.map_file.close() except Exception as ex: log(f"Error map file close {ex}") return EExitCode.FAIL self.map_file = None if self.sm_segment is not None: try: self.sm_segment.close_fd() if unlink: self.sm_segment.unlink() except Exception as ex: log(f"Error sm segment close {ex}") return EExitCode.FAIL self.sm_segment = None return EExitCode.SUCCESS def write(self, payload: str) -> EExitCode: payload_size = len(payload) log(f"Shared memory write '{payload}' {payload_size} bytes") exit_code = EExitCode.SUCCESS if self.sm_segment is None or self.sm_segment.size != SMHandler.calculate_best_size(payload_size): self.resize(payload_size) exit_code = self.update_map() if exit_code == EExitCode.SUCCESS: try: self.map_file.seek(0) _ = self.map_file.write(payload.encode()) return exit_code except Exception as ex: log(f"Error sm write {ex}") return EExitCode.FAIL def read(self, payload_size: int) -> tuple[str, EExitCode]: if self.update_map() == EExitCode.SUCCESS: try: self.map_file.seek(0) payload = self.map_file.read(payload_size).decode() log(f"Shared memory read '{payload}' {payload_size} bytes") return payload, EExitCode.SUCCESS except Exception as ex: log(f"Error sm read {ex}") return None, EExitCode.FAIL
versiones cpp y rust: Github: T-Services
Podemos iniciar un host y un worker para ver la interacción entre ambos mediante meoria compartida y colas de mensajes
Conclusion
Con este código hemos terminado la segunda parte de nuestro diseño de servicios, el intercambio de datos mediante memoria compartida y su comunicación con otro servicio. Hemos utilizado una cola de mensajes pero otras primitivas como pipes o semáforos pueden tambien utilizarse para esta sincronización.
Tutorial files
Te puede interesar:
Ayudanos con este blog!
En el último año hemos estado dedicando cada vez más tiempo a la creación de tutoriales, en su mayoria sobre desarrollo de videojuegos. Si crees que estos posts te han ayudado de alguna manera o incluso inspirado, por favor considera ayudarnos a mantener este blog con alguna de estas opciones. Gracias por hacerlo posible!