stream from fetcher to callback

In the world of Daffi, yield statements hold a unique significance as they pertain to stream processing. Streams can be classified into two types: those that go from a fetcher to a callback and those that go from a callback to a fetcher. Similar to return statements, yield statements can be followed by a tuple or a special "Args" class to send multiple arguments to a remote location.

Streams can also be utilized like events to wait for specific conditions on a remote.

stream_service.py content:

import logging
from daffi import Global
from daffi.registry import Callback

logging.basicConfig(level=logging.INFO)


class StreamerService(Callback):
    auto_init = True

    def __post_init__(self):
        self.items = []

    def stream_to_service(self, item, process_name):
        self.items.append(item)
        print(f"Received item: {item} from process: {process_name}")

    def get_items(self):
        return self.items


if __name__ == '__main__':
    Global(init_controller=True, host="localhost", port=8888).join()

stream_client.py content:

import logging
from daffi import Global
from daffi.registry import Fetcher, Args

logging.basicConfig(level=logging.INFO)

PROCESS_NAME = "streamer client"


class StreamerClient(Fetcher):

    def __post_init__(self):
        self.items = range(1000)

    def stream_to_service(self):
        """Process stream"""
        for item in self.items:
            yield Args(item=item, process_name=PROCESS_NAME)

    def get_items(self):
        """Get all items from service"""
        pass


if __name__ == '__main__':
    g = Global(host="localhost", port=8888, process_name=PROCESS_NAME)

    stream_client = StreamerClient()
    stream_client.stream_to_service()

    # get all items from service after stream processing
    items = stream_client.get_items()
    print(items)

    g.stop()

Execute in two separate terminals:

python3 stream_service.py
python3 stream_client.py

stream_service.py content:

import logging
from daffi import Global
from daffi.decorators import callback

logging.basicConfig(level=logging.INFO)

items = []


@callback
def stream_to_service(item, process_name):
    items.append(item)
    print(f"Received item: {item} from process: {process_name}")


@callback
def get_items():
    return items


if __name__ == '__main__':
    Global(init_controller=True, host="localhost", port=8888).join()

stream_client.py content:

import logging
from daffi import Global
from daffi.decorators import fetcher
from daffi.registry import Args

logging.basicConfig(level=logging.INFO)

PROCESS_NAME = "streamer client"

items = range(1000)


@fetcher
def stream_to_service():
    """Process stream"""
    for item in items:
        yield Args(item=item, process_name=PROCESS_NAME)


@fetcher
def get_items(self):
    """Get all items from service"""
    pass


if __name__ == '__main__':
    g = Global(host="localhost", port=8888, process_name=PROCESS_NAME)

    stream_to_service()

    result = get_items()
    print(result)

    g.stop()

Execute in two separate terminals:

python3 stream_service.py
python3 stream_client.py

Note

To use UNIX socket instead of TCP for communication, you should remove the host and port parameters from the initialization of the Global object, and optionally include the unix_sock_path parameter.