With this tutorial series we are going to explore and implement with multiple code languages the functionality of a powerful type of programs: the services. A service is a program that waits for other program interaction to perform a task in background with an independent context.
We can start communicating two services using POSIX message queues. As each service can use their own context we can implement each service with different coding language. For our examples we are going to view their implementation using C++, Python and Rust language.
Service IPC: POSIX message queue
We are going to setup one queue to store the requests and other to store the responses. With this structure we could have multiple workers (Services B) for one host (Service A) and parallelize work across multiple processes.
So we can start with the module that manage the message queue. Their first job is to initialize the queue with the correct permissions for each type of service. A host must be able to write in the requests queue and read in the responses queue, by contrast the worker must read in the requests queue and write in the responses queue.
Both queues needs the O_CREAT flag to create the queue in the system if does not exists, the flag O_WRONLY in the queue that writes and O_RDONLY in the queue that need to be read.
We can reuse the same code for host and worker and just exchange the name of the request and response queue for the worker in the connect function call.
mq_handler.py
from posix_ipc import MessageQueue, O_RDONLY, O_WRONLY, O_CREAT from pcube.common.enums import EExitCode from pcube.common.logger import log class MQHandler: MAX_MESSAGE_SIZE = 512 def __init__(self): self.mq_request = None self.mq_response = None def connect(self, mq_request_name: str, mq_response_name: str) -> EExitCode: exit_code = EExitCode.SUCCESS self.mq_request = MessageQueue(mq_request_name, O_CREAT | O_WRONLY, max_message_size=MQHandler.MAX_MESSAGE_SIZE, max_messages=1, read = False, write = True) if self.mq_request is None: log(f"Error opening with mq_request") exit_code = EExitCode.FAIL self.mq_response = MessageQueue(mq_response_name, O_CREAT | O_RDONLY, max_message_size=MQHandler.MAX_MESSAGE_SIZE, max_messages=1, read = True, write = False) if self.mq_response is None: log(f"Error opening with mq_response") exit_code = EExitCode.FAIL return exit_code def disconnect(self, unlink: bool) -> EExitCode: exit_code = EExitCode.SUCCESS if self.mq_request: self.mq_request.close() if unlink: self.mq_request.unlink() self.mq_request = None if self.mq_response: self.mq_response.close() if unlink: self.mq_response.unlink() self.mq_response = None return exit_code def send_wait(self, message: str) -> EExitCode: try: log(f"Sending message '{message}'") self.mq_request.send(message=message, timeout=None) return EExitCode.SUCCESS except KeyboardInterrupt as ex: log(f"Safe KeyboardInterrupt") return EExitCode.FAIL except Exception as ex: log(f"Error mq_send {ex}") return EExitCode.FAIL def receive_wait(self) -> tuple[str, EExitCode]: try: message, _ = self.mq_response.receive(timeout=None) decoded_message = message.decode() log(f"Received message '{decoded_message}'") return decoded_message, EExitCode.SUCCESS except KeyboardInterrupt as ex: log(f"Safe KeyboardInterrupt") return f"{ex}", EExitCode.FAIL except Exception as ex: log(f"Error mq_receive {ex}") return f"{ex}", EExitCode.FAIL
cpp and rust versions: Github: T-Services
The service will connect the queue in their run method and disconnect in their finalization.
service.py – start/stop_listener
def start_listener(self) -> bool: self._listening = True exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker) if exit_code == EExitCode.SUCCESS: log(f"Service start listening : host({self._config.is_host})") return True return False def stop_listener(self): self._listening = False log("Service stop listening") self._mq_handler.disconnect(self._config.is_host)
The run function for the service starts connecting to both queues using the mq_handler, and if it is a host send the request for the first task and waits indefinitely for their response.
host/service.py – run
def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): self._mq_handler.send_wait("task-1") while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
The worker is waiting indefinitely for a request in their listener, and when receives a new one send the response to the host using the responses queue.
worker/service.py – run
def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self._mq_handler.send_wait(f"{message} processed") self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
Finally the full code for both service types:
host/service.py
from pcube.common.logger import log from pcube.common.enums import EExitCode from pcube.common.mq_handler import MQHandler from pcube.common.service_config import ServiceConfig class Service: def __init__(self, config: ServiceConfig): self._config: ServiceConfig = config self._listening = False self._mq_handler = MQHandler() def start_listener(self) -> bool: self._listening = True exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker) if exit_code == EExitCode.SUCCESS: log(f"Service start listening : host({self._config.is_host})") return True return False def stop_listener(self): self._listening = False log("Service stop listening") self._mq_handler.disconnect(self._config.is_host) def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): self._mq_handler.send_wait("task-1") while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
host/worker.py
from pcube.common.logger import log from pcube.common.enums import EExitCode from pcube.common.mq_handler import MQHandler from pcube.common.service_config import ServiceConfig class Service: def __init__(self, config: ServiceConfig): self._config: ServiceConfig = config self._listening = False self._mq_handler = MQHandler() def start_listener(self) -> bool: self._listening = True exit_code = self._mq_handler.connect(self._config.q_name_host, self._config.q_name_worker) if exit_code == EExitCode.SUCCESS: log(f"Service start listening : host({self._config.is_host})") return True return False def stop_listener(self): self._listening = False log("Service stop listening") self._mq_handler.disconnect(self._config.is_host) def run(self) -> EExitCode: exit_code = EExitCode.SUCCESS if self.start_listener(): while self._listening: message, status = self._mq_handler.receive_wait() if status == EExitCode.SUCCESS: self._mq_handler.send_wait(f"{message} processed") self.stop_listener() else: exit_code = EExitCode.FAIL self.stop_listener() else: log("Unable to init listener") exit_code = EExitCode.FAIL return exit_code
cpp and rust versions: Github: T-Services
We can run a host and a worker to see the interaction of both services using POSIX message queues
Conclusion
With this code we have finished the first part of a service program, the communication with other services. We have use a message queue but other primitives like pipes, semaphores,.. can be valid too for this purpose.
Now that we have the basic communication we can start adding more interesting features like a shared memory handler, but in the next tutorial part.
Tutorial files
You may also like:
Support this blog!
For the past year I've been dedicating more of my time to the creation of tutorials, mainly about game development. If you think these posts have either helped or inspired you, please consider supporting this blog. Thank you so much for your contribution!