en Linux, Tutoriales

Diseño de servicios: IPC cola de mensajes

Con esta nueva serie de tutoriales vamos a explorar e implementar con diferentes lenguajes de programación la funcionalidad de un poderoso tipo de programas: los servicios. Un servicio es un programa que espera la interacción de otros programas para realizar una tarea en segundo plano con un contexto independiente.

Empezaremos comunicando dos servicios utilizando colas de mensajes POSIX. Como cada servicio utiliza su propio contexto podemos implementar cada servicio con un lenguaje de programación diferente. Para nuestro ejemplo veremos su implementación en C++, Python y Rust.

IPC con colas de mensajes POSIX
IPC y memoria compartida

Vamos a crear una cola para almacenar las peticiones y otra para las respuestas. Con esta estructura podríamos tener multiples trabajadores (Servicio B) por un solo servidor (Servicio A) y distribuir todo el trabajo a través de múltiples procesos diferentes.

Podemos empezar con el módulo que gestiona las colas de mensajes. Su primer cometido es inicializar la cola con los permisos que le corresponden a cada tipo de servicio. Un host tiene que poder escribir en la cola que contiene las peticiones y leer en la que tiene las respuestas, por el contrario un wroker tiene que leer de la cola de peticiones y escribir en la cola de respuestas.

Ambas colas se abren con el flag O_CREAT para crear la cola en el sistema en caso de que no exista, con el flag O_WRONLY la marcamos para que se pueda escribir en ella y con el flag O_RDONLY para que se pueda escribir.

Vamos a reutilizar el mismo manager tanto en el host como en el worker, solo tendremos que intercambiar los nombres de las colas en el worker cuando iniciemos su gestor de colas.

mq_handler.py

from posix_ipc import MessageQueue, O_RDONLY, O_WRONLY, O_CREAT

from pcube.common.enums import EExitCode
from pcube.common.logger import log

class MQHandler:
    MAX_MESSAGE_SIZE = 512

    def __init__(self):
        self.mq_request = None
        self.mq_response = None

    def connect(self, mq_request_name: str, mq_response_name: str) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        self.mq_request = MessageQueue(mq_request_name, 
                                        O_CREAT | O_WRONLY, 
                                        max_message_size=MQHandler.MAX_MESSAGE_SIZE, 
                                        max_messages=1, 
                                        read = False, 
                                        write = True)
        if self.mq_request is None:
            log(f"Error opening with mq_request")
            exit_code = EExitCode.FAIL

        self.mq_response = MessageQueue(mq_response_name, 
                                        O_CREAT | O_RDONLY, 
                                        max_message_size=MQHandler.MAX_MESSAGE_SIZE, 
                                        max_messages=1, 
                                        read = True, 
                                        write = False)
        if self.mq_response is None:
            log(f"Error opening with mq_response")
            exit_code = EExitCode.FAIL
        return exit_code

    def disconnect(self, unlink: bool) -> EExitCode: 
        exit_code = EExitCode.SUCCESS
        if self.mq_request:
            self.mq_request.close()
            if unlink:
                self.mq_request.unlink()
            self.mq_request = None
        if self.mq_response:
            self.mq_response.close()
            if unlink:
                self.mq_response.unlink()
            self.mq_response = None
        return exit_code
    
    def send_wait(self, message: str) -> EExitCode:
        try:
            log(f"Sending message '{message}'")
            self.mq_request.send(message=message, timeout=None)
            return EExitCode.SUCCESS
        except KeyboardInterrupt as ex:
            log(f"Safe KeyboardInterrupt")
            return EExitCode.FAIL
        except Exception as ex:
            log(f"Error mq_send {ex}")
            return EExitCode.FAIL

    def receive_wait(self) -> tuple[str, EExitCode]:
        try:
            message, _ = self.mq_response.receive(timeout=None)
            decoded_message = message.decode()
            log(f"Received message '{decoded_message}'")
            return decoded_message, EExitCode.SUCCESS
        except KeyboardInterrupt as ex:
            log(f"Safe KeyboardInterrupt")
            return f"{ex}", EExitCode.FAIL
        except Exception as ex:
            log(f"Error mq_receive {ex}")
            return f"{ex}", EExitCode.FAIL

versiones cpp y rust: Github: T-Services

El servicio conectará a la cola en su método run y desconectará en su finalización.

service.py – start/stop_listener

    def start_listener(self) -> bool:
        self._listening = True
        exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
        if exit_code == EExitCode.SUCCESS:
            log(f"Service start listening : host({self._config.is_host})")
            return True
        return False

    def stop_listener(self):
        self._listening = False
        log("Service stop listening")
        self._mq_handler.disconnect(self._config.is_host)

El método run comienza connectando a ambas colas mediante el gestor de colas, y si es un servicio host enviaremos la primera petición quedando indefinidamente a la espera de su respuesta.

host/service.py – run

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            self._mq_handler.send_wait("task-1")
            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

El worker estará esperando indefinidamente en su listener una nueva tarea, cuando la reciba enviará una respuesta al host mediante la cola de respuesta.

worker/service.py – run

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self._mq_handler.send_wait(f"{message} processed")
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

Finalmente el código completo de ambos tipos de servicio:

host/service.py

from pcube.common.logger import log
from pcube.common.enums import EExitCode
from pcube.common.mq_handler import MQHandler
from pcube.common.service_config import ServiceConfig

class Service:
    def __init__(self, config: ServiceConfig):
        self._config: ServiceConfig = config
        self._listening = False
        self._mq_handler = MQHandler()

    def start_listener(self) -> bool:
        self._listening = True
        exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
        if exit_code == EExitCode.SUCCESS:
            log(f"Service start listening : host({self._config.is_host})")
            return True
        return False

    def stop_listener(self):
        self._listening = False
        log("Service stop listening")
        self._mq_handler.disconnect(self._config.is_host)

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            self._mq_handler.send_wait("task-1")
            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

host/worker.py

from pcube.common.logger import log
from pcube.common.enums import EExitCode
from pcube.common.mq_handler import MQHandler
from pcube.common.service_config import ServiceConfig

class Service:
    def __init__(self, config: ServiceConfig):
        self._config: ServiceConfig = config
        self._listening = False
        self._mq_handler = MQHandler()

    def start_listener(self) -> bool:
        self._listening = True
        exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker)
        if exit_code == EExitCode.SUCCESS:
            log(f"Service start listening : host({self._config.is_host})")
            return True
        return False

    def stop_listener(self):
        self._listening = False
        log("Service stop listening")
        self._mq_handler.disconnect(self._config.is_host)

    def run(self) -> EExitCode:
        exit_code = EExitCode.SUCCESS
        if self.start_listener():

            while self._listening:
                message, status = self._mq_handler.receive_wait()
                if status == EExitCode.SUCCESS:
                    self._mq_handler.send_wait(f"{message} processed")
                    self.stop_listener()
                else:
                    exit_code = EExitCode.FAIL
                    self.stop_listener()
        else:
            log("Unable to init listener")
            exit_code = EExitCode.FAIL
        return exit_code

Version cpp y rust: Github: T-Services

Al iniciar un host y un worker podemos ver la interacción que se produce entre ambos antes de su finalización.

service_posix_mq_com

Conclusion

Con este código hemos terminado la primera parte de un servicio, la comunicación con otros servicios. Hemos utilizado una cola de mensajes pero otras primitivas como pipes, semáforos,.. pueden ser igual de válidas para este propósito.

Ahora que tenemos implementada la comunicación básica nos podemos poner a añadir más funciones interesantes como el uso de memoria compartida, pero lo veremos en el próximo tutorial.

Tutorial files

Ayudanos con este blog!

En el último año hemos estado dedicando cada vez más tiempo a la creación de tutoriales, en su mayoria sobre desarrollo de videojuegos. Si crees que estos posts te han ayudado de alguna manera o incluso inspirado, por favor considera ayudarnos a mantener este blog con alguna de estas opciones. Gracias por hacerlo posible!

Escribe un comentario

Comentario