broadcasting
You can register callbacks with the same name on multiple processes, allowing for simultaneous execution.
In addition to one-to-one communication, there is a specific execution modifier called BROADCAST
that can be utilized in fetchers.
The BROADCAST
modifier ensures that the result is only returned after all callbacks have completed their work.
The resulting output takes on a dictionary structure, with keys representing the process names and corresponding values representing the computed results.
{
"process-1": "<result-1>",
"process-2": "<result-2>",
"process-N": "<result-N>"
}
Warning
Make sure you initialized only one process with init_controller=True
argument
burger_menu_service.py
content:
import logging
from daffi import Global
from daffi.registry import Callback
logging.basicConfig(level=logging.INFO)
class BurgerMenu(Callback):
auto_init = True
def get_menu(self):
return ["The IceBurg", "The Grill Thrill", "Burger Mania", "Chicha Burger"]
if __name__ == '__main__':
Global(init_controller=True, host="localhost", port=8888, process_name="burger menu").join()
hotdog_menu_service.py
content:
import logging
from daffi import Global
from daffi.registry import Callback
logging.basicConfig(level=logging.INFO)
class HotDogMenu(Callback):
def get_menu(self):
return ["Wiener", "Weenie", "Coney", "Red Hot"]
if __name__ == '__main__':
Global(host="localhost", port=8888, process_name="hotdog menu").join()
menu_client.py
content:
import logging
from daffi import Global, BROADCAST
from daffi.registry import Fetcher
logging.basicConfig(level=logging.INFO)
class MenuFetcher(Fetcher):
exec_modifier = BROADCAST
def get_menu(self):
pass
if __name__ == '__main__':
menu_fetcher = MenuFetcher()
g = Global(host="localhost", port=8888)
# Make sure all processes started
for proc in ("burger menu", "hotdog menu"):
g.wait_process(proc)
menus = menu_fetcher.get_menu()
print(menus)
# {'burger menu': ['The IceBurg', 'The Grill Thrill', 'Burger Mania', 'Chicha Burger'], 'hotdog menu': ['Wiener', 'Weenie', 'Coney', 'Red Hot']}
g.stop()
Execute in three separate terminals:
python3 burger_menu_service.py
python3 hotdog_menu_service.py
python3 menu_client.py
burger_menu_service.py
content:
import logging
from daffi import Global
from daffi.decorators import callback
logging.basicConfig(level=logging.INFO)
@callback
def get_menu():
return ["The IceBurg", "The Grill Thrill", "Burger Mania", "Chicha Burger"]
if __name__ == '__main__':
Global(init_controller=True, host="localhost", port=8888, process_name="burger menu").join()
hotdog_menu_service.py
content:
import logging
from daffi import Global
from daffi.decorators import callback
logging.basicConfig(level=logging.INFO)
@callback
def get_menu():
return ["Wiener", "Weenie", "Coney", "Red Hot"]
if __name__ == '__main__':
Global(host="localhost", port=8888, process_name="hotdog menu").join()
menu_client.py
content:
import logging
from daffi import Global, BROADCAST
from daffi.decorators import fetcher
logging.basicConfig(level=logging.INFO)
@fetcher(exec_modifier=BROADCAST)
def get_menu():
pass
if __name__ == '__main__':
g = Global(host="localhost", port=8888)
# Make sure all processes started
for proc in ("burger menu", "hotdog menu"):
g.wait_process(proc)
menus = get_menu()
print(menus)
# {'burger menu': ['The IceBurg', 'The Grill Thrill', 'Burger Mania', 'Chicha Burger'], 'hotdog menu': ['Wiener', 'Weenie', 'Coney', 'Red Hot']}
g.stop()
Execute in three separate terminals:
python3 burger_menu_service.py
python3 hotdog_menu_service.py
python3 menu_client.py
Note
To use UNIX socket instead of TCP for communication, you should remove the host
and port
parameters from
the initialization of the Global object, and optionally include the unix_sock_path
parameter.