stream from callback to fetcher

Just like how streams can be initiated from a fetcher to a callback, callbacks can also initialize streams using yield statements. In this scenario, the fetcher receives a generator as the result.

Streams can also be utilized like events to wait for specific conditions on a remote.

stream_service.py content:

import logging
from daffi import Global
from daffi.registry import Callback

logging.basicConfig(level=logging.INFO)


class StreamerService(Callback):
    auto_init = True

    def generate_stream(self, end):
        for i in range(end):
            yield i


if __name__ == '__main__':
    Global(init_controller=True, host="localhost", port=8888).join()

stream_client.py content:

import logging
from daffi import Global
from daffi.registry import Fetcher

logging.basicConfig(level=logging.INFO)


class StreamerClient(Fetcher):

    def generate_stream(self, end):
        """Generate stream by callback"""
        pass


if __name__ == '__main__':
    g = Global(host="localhost", port=8888)

    stream_client = StreamerClient()
    result = stream_client.generate_stream(end=1000)

    for item in result:
        print(item)

    g.stop()

Execute in two separate terminals:

python3 stream_service.py
python3 stream_client.py

stream_service.py content:

import logging
from daffi import Global
from daffi.decorators import callback

logging.basicConfig(level=logging.INFO)


@callback
def generate_stream(end):
    for item in range(end):
        yield item


if __name__ == '__main__':
    Global(init_controller=True, host="localhost", port=8888).join()

stream_client.py content:

import logging
from daffi import Global
from daffi.decorators import fetcher

logging.basicConfig(level=logging.INFO)


@fetcher
def generate_stream(end):
    pass


if __name__ == '__main__':
    g = Global(host="localhost", port=8888)

    result = generate_stream(1000)
    for item in result:
        print(item)

    g.stop()

Execute in two separate terminals:

python3 stream_service.py
python3 stream_client.py

Note

To use UNIX socket instead of TCP for communication, you should remove the host and port parameters from the initialization of the Global object, and optionally include the unix_sock_path parameter.