en Linux, Tutoriales

Diseño de servicios: Memoria compartida

Vamos a extender nuestro tutorial anterior con una nueva característica, un gestor de memoria compartida. Con ella nuestros servicios serán capaces de manejar bloques de datos más grandes que con una cola de mensajes.

IPC con colas de mensajes POSIX
IPC y memoria compartida

En una parte anterior de esta serie de tutoriales vimos como las colas de mensajes podían usarse para comunicar dos servicios e intercambiar pequeños bloques de información. Una cola POSIX tiene un tamaño máximo de mensaje de 8192 bytes. Podemos modificar este tamaño durante la creación de la cola, pero si necesitamos manejar un tamaño de mensaje muy grande es mejor utilizar otro tipo de recurso: memoria compartida

Con este recurso podemos reservar una zona de memoria a la que puede acceder más de un proceso. Almacenar tanto la petición como la respuesta en esta zona y añadir un sistema de sincronización para controlar su escritura y lectura.

Para la sincronización reutilizaremos las colas de mensajes.

services_shared_memory_diagram

Empecemos con la inicialización de nuestro gestor de memoria compartida. De manera similar a otros recursos tendremos que crearlo o abrirlo en el caso de que ya exista, para ello utilizaremos los flags O_CREAT y O_RDWR. Dependiendo de la implementación en algunas funciones se puede añadir y reservar el tamaño inicial del segmento de memoria compartida, pero tambien lo podemos definir más tarde, justo antes de realizar las operatciones de escritura o lectura.

self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=4096)
if self.sm_segment is None:
    log(f"Error opening shared memory")

Para un mejor rendimiento utilizamos múltiplos del tamaño de página del sistema. Calcularemos cuantas páginas necesitamos para almacenar nuestros datos y reajustaremos el tamaño de nuestra memoria mapeada para contener ese número de páginas.

def calculate_best_size(minimal_size: int) -> int:
    return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE

Para poder utilizar esta memoria en nuestro programa necesitamos mapearla a un fichero dentro del sistema operativo. Para ello usaremos la función mmap, en caso de utilizar plataformas Unix, uno de sus parámetros es el descriptor de fichero obtenido al inicializar el segmento de memoria compartida. Cada vez que cambiemos el tamaño de nuestro segmento de memoria tendremos que volver a llamar mmap para crear un nuevo mapeo de la memoria. La escritura y lectura mediante este fichero es similar a realizarlas con un fichero almacenado en disco.

Un servicio A debe conocer si el tamaño de la memoria ha sido cambiada por el otro servicio B para remapear y actualizar el puntero a este fichero. La manera más facil es mandar el tamaño por la cola de mensajes cuando una escritura es realizada, el otro servicio recibe el mensaje y extrae el tamaño, comparandolo con el tamaño que tiene almacenado para su segmento, si el tamaño es diferente entonces el otro servicio ha cambiado el tamaño de la memoria compartida y debe actualizar sus variable con un nuevo mapeo

Cuando la memoria compartida deja de ser necesaria se puede liberar mediante munmap para liberar el mapeo del fichero, close para cerrar el descriptor de fichero del segmento y finalmente el último de los servicios deberá llamar a unlink para borrar el recurso

sm_handler.py

from posix_ipc import SharedMemory, O_CREAT, O_RDWR
import mmap

from pcube.common.enums import EExitCode
from pcube.common.logger import log

class SMHandler:
    INITIAL_SIZE = 512

    def __init__(self):
        self.sm_segment = None
        self.map_file = None
        self.mapped_size = 0

    @staticmethod
    def calculate_best_size(minimal_size: int) -> int:
        return ((minimal_size + mmap.PAGESIZE - 1) // mmap.PAGESIZE) * mmap.PAGESIZE
    
    def update_map(self) -> EExitCode:
        if (self.sm_segment is not None and self.mapped_size < self.sm_segment.size):
            try:
                self.map_file = mmap.mmap(self.sm_segment.fd, self.sm_segment.size)
                self.mapped_size = self.sm_segment.size
                log(f"Shared memory update map {self.map_file.size()} bytes")
            except Exception:
                return EExitCode.FAIL
        return EExitCode.SUCCESS

    def resize(self, size: int=None) -> EExitCode:
        optimal_size = SMHandler.calculate_best_size(size)
        log(f"Shared memory segment resize {optimal_size} bytes")
        self.map_file.resize(optimal_size)
        return EExitCode.SUCCESS

    def connect(self, name: str) -> EExitCode:
        optimal_size = SMHandler.calculate_best_size(SMHandler.INITIAL_SIZE)
        self.sm_segment = SharedMemory(name, O_CREAT | O_RDWR, size=optimal_size)
        if self.sm_segment is None:
            log(f"Error opening shared memory")
            return EExitCode.FAIL
        return self.update_map()

    def disconnect(self, unlink: bool) -> EExitCode:
        if self.map_file is not None:
            try:
                self.map_file.close()
            except Exception as ex:
                log(f"Error map file close {ex}")
                return EExitCode.FAIL
            self.map_file = None

        if self.sm_segment is not None:
            try:
                self.sm_segment.close_fd()
                if unlink:
                    self.sm_segment.unlink()
            except Exception as ex:
                log(f"Error sm segment close {ex}")
                return EExitCode.FAIL
            self.sm_segment = None
        return EExitCode.SUCCESS

    def write(self, payload: str) -> EExitCode:
        payload_size = len(payload)
        log(f"Shared memory write '{payload}' {payload_size} bytes")
        exit_code = EExitCode.SUCCESS
        if self.sm_segment is None or self.sm_segment.size != SMHandler.calculate_best_size(payload_size):
            self.resize(payload_size)
            exit_code = self.update_map()
        
        if exit_code == EExitCode.SUCCESS:
            try:
                self.map_file.seek(0)
                _ = self.map_file.write(payload.encode())
                return exit_code
            except Exception as ex:
                log(f"Error sm write {ex}")
        return EExitCode.FAIL
    
    def read(self, payload_size: int) -> tuple[str, EExitCode]:
        if self.update_map() == EExitCode.SUCCESS:
            try:
                self.map_file.seek(0)
                payload = self.map_file.read(payload_size).decode()
                log(f"Shared memory read '{payload}' {payload_size} bytes")
                return payload, EExitCode.SUCCESS
            except Exception as ex:
                log(f"Error sm read {ex}")
        return None, EExitCode.FAIL

versiones cpp y rust: Github: T-Services

Podemos iniciar un host y un worker para ver la interacción entre ambos mediante meoria compartida y colas de mensajes

service_posix_shared_mem_com

Conclusion

Con este código hemos terminado la segunda parte de nuestro diseño de servicios, el intercambio de datos mediante memoria compartida y su comunicación con otro servicio. Hemos utilizado una cola de mensajes pero otras primitivas como pipes o semáforos pueden tambien utilizarse para esta sincronización.

Tutorial files

Ayudanos con este blog!

En el último año hemos estado dedicando cada vez más tiempo a la creación de tutoriales, en su mayoria sobre desarrollo de videojuegos. Si crees que estos posts te han ayudado de alguna manera o incluso inspirado, por favor considera ayudarnos a mantener este blog con alguna de estas opciones. Gracias por hacerlo posible!

Escribe un comentario

Comentario