Con esta nueva serie de tutoriales vamos a explorar e implementar con diferentes lenguajes de programación la funcionalidad de un poderoso tipo de programas: los servicios. Un servicio es un programa que espera la interacción de otros programas para realizar una tarea en segundo plano con un contexto independiente.
Empezaremos comunicando dos servicios utilizando colas de mensajes POSIX. Como cada servicio utiliza su propio contexto podemos implementar cada servicio con un lenguaje de programación diferente. Para nuestro ejemplo veremos su implementación en C++, Python y Rust.
IPC con colas de mensajes POSIX
IPC y memoria compartida
Vamos a crear una cola para almacenar las peticiones y otra para las respuestas. Con esta estructura podríamos tener multiples trabajadores (Servicio B) por un solo servidor (Servicio A) y distribuir todo el trabajo a través de múltiples procesos diferentes.
Podemos empezar con el módulo que gestiona las colas de mensajes. Su primer cometido es inicializar la cola con los permisos que le corresponden a cada tipo de servicio. Un host tiene que poder escribir en la cola que contiene las peticiones y leer en la que tiene las respuestas, por el contrario un wroker tiene que leer de la cola de peticiones y escribir en la cola de respuestas.
Ambas colas se abren con el flag O_CREAT para crear la cola en el sistema en caso de que no exista, con el flag O_WRONLY la marcamos para que se pueda escribir en ella y con el flag O_RDONLY para que se pueda escribir.
Vamos a reutilizar el mismo manager tanto en el host como en el worker, solo tendremos que intercambiar los nombres de las colas en el worker cuando iniciemos su gestor de colas.
mq_handler.py
from posix_ipc import MessageQueue, O_RDONLY, O_WRONLY, O_CREAT from pcube.common.enums import EExitCode from pcube.common.logger import log class MQHandler: MAX_MESSAGE_SIZE = 512 def __init__(self): self.mq_request = None self.mq_response = None def connect(self, mq_request_name: str, mq_response_name: str) -> EExitCode: exit_code = EExitCode.SUCCESS self.mq_request = MessageQueue(mq_request_name, O_CREAT | O_WRONLY, max_message_size=MQHandler.MAX_MESSAGE_SIZE, max_messages=1, read = False, write = True) if self.mq_request is None: log(f"Error opening with mq_request") exit_code = EExitCode.FAIL self.mq_response = MessageQueue(mq_response_name, O_CREAT | O_RDONLY, max_message_size=MQHandler.MAX_MESSAGE_SIZE, max_messages=1, read = True, write = False) if self.mq_response is None: log(f"Error opening with mq_response") exit_code = EExitCode.FAIL return exit_code def disconnect(self, unlink: bool) -> EExitCode: exit_code = EExitCode.SUCCESS if self.mq_request: self.mq_request.close() if unlink: self.mq_request.unlink() self.mq_request = None if self.mq_response: self.mq_response.close() if unlink: self.mq_response.unlink() self.mq_response = None return exit_code def send_wait(self, message: str) -> EExitCode: try: log(f"Sending message '{message}'") self.mq_request.send(message=message, timeout=None) return EExitCode.SUCCESS except KeyboardInterrupt as ex: log(f"Safe KeyboardInterrupt") return EExitCode.FAIL except Exception as ex: log(f"Error mq_send {ex}") return EExitCode.FAIL def receive_wait(self) -> tuple[str, EExitCode]: try: message, _ = self.mq_response.receive(timeout=None) decoded_message = message.decode() log(f"Received message '{decoded_message}'") return decoded_message, EExitCode.SUCCESS except KeyboardInterrupt as ex: log(f"Safe KeyboardInterrupt") return f"{ex}", EExitCode.FAIL except Exception as ex: log(f"Error mq_receive {ex}") return f"{ex}", EExitCode.FAIL
versiones cpp y rust: Github: T-Services
El servicio conectará a la cola en su método run y desconectará en su finalización.
service.py – start/stop_listener
def start_listener(self) -> bool: self._listening = True exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker) if exit_code == EExitCode.SUCCESS: log(f"Service start listening : host({self._config.is_host})") return True return False def stop_listener(self): self._listening = False log("Service stop listening") self._mq_handler.disconnect(self._config.is_host)
El método run comienza connectando a ambas colas mediante el gestor de colas, y si es un servicio host enviaremos la primera petición quedando indefinidamente a la espera de su respuesta.
host/service.py – run
def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): self._mq_handler.send_wait("task-1") while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
El worker estará esperando indefinidamente en su listener una nueva tarea, cuando la reciba enviará una respuesta al host mediante la cola de respuesta.
worker/service.py – run
def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self._mq_handler.send_wait(f"{message} processed") self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
Finalmente el código completo de ambos tipos de servicio:
host/service.py
from pcube.common.logger import log from pcube.common.enums import EExitCode from pcube.common.mq_handler import MQHandler from pcube.common.service_config import ServiceConfig class Service: def __init__(self, config: ServiceConfig): self._config: ServiceConfig = config self._listening = False self._mq_handler = MQHandler() def start_listener(self) -> bool: self._listening = True exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker) if exit_code == EExitCode.SUCCESS: log(f"Service start listening : host({self._config.is_host})") return True return False def stop_listener(self): self._listening = False log("Service stop listening") self._mq_handler.disconnect(self._config.is_host) def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): self._mq_handler.send_wait("task-1") while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
host/worker.py
from pcube.common.logger import log from pcube.common.enums import EExitCode from pcube.common.mq_handler import MQHandler from pcube.common.service_config import ServiceConfig class Service: def __init__(self, config: ServiceConfig): self._config: ServiceConfig = config self._listening = False self._mq_handler = MQHandler() def start_listener(self) -> bool: self._listening = True exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker) if exit_code == EExitCode.SUCCESS: log(f"Service start listening : host({self._config.is_host})") return True return False def stop_listener(self): self._listening = False log("Service stop listening") self._mq_handler.disconnect(self._config.is_host) def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self._mq_handler.send_wait(f"{message} processed") self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
Version cpp y rust: Github: T-Services
Al iniciar un host y un worker podemos ver la interacción que se produce entre ambos antes de su finalización.
Conclusion
Con este código hemos terminado la primera parte de un servicio, la comunicación con otros servicios. Hemos utilizado una cola de mensajes pero otras primitivas como pipes, semáforos,.. pueden ser igual de válidas para este propósito.
Ahora que tenemos implementada la comunicación básica nos podemos poner a añadir más funciones interesantes como el uso de memoria compartida, pero lo veremos en el próximo tutorial.
Tutorial files
Te puede interesar:
Ayudanos con este blog!
El último año he estado dedicando cada vez más tiempo a la creación de tutoriales, en su mayoria sobre desarrollo de videojuegos. Si crees que estos posts te han ayudado de alguna manera o incluso inspirado, por favor considera ayudarnos a mantener este blog con alguna de estas opciones. Gracias por hacerlo posible!