Source code for feldera.output_handler

import pandas as pd
from typing import Optional

from queue import Queue
from feldera import FelderaClient
from feldera._callback_runner import CallbackRunner


[docs] class OutputHandler: def __init__(self, client: FelderaClient, pipeline_name: str, view_name: str, queue: Optional[Queue]): """ Initializes the output handler, but doesn't start it. To start the output handler, call the `.OutputHandler.start` method. """ self.client: FelderaClient = client self.pipeline_name: str = pipeline_name self.view_name: str = view_name self.queue: Optional[Queue] = queue self.buffer: list[pd.DataFrame] = [] # the callback that is passed to the `CallbackRunner` def callback(df: pd.DataFrame, _: int): if not df.empty: self.buffer.append(df) # sets up the callback runner self.handler = CallbackRunner(self.client, self.pipeline_name, self.view_name, callback, queue)
[docs] def start(self): """ Starts the output handler in a separate thread """ self.handler.start()
[docs] def to_pandas(self, clear_buffer: bool = True): """ Returns the output of the pipeline as a pandas DataFrame :param clear_buffer: Whether to clear the buffer after getting the output. """ if len(self.buffer) == 0: return pd.DataFrame() res = pd.concat(self.buffer, ignore_index=True) if clear_buffer: self.buffer.clear() return res
[docs] def to_dict(self, clear_buffer: bool = True): """ Returns the output of the pipeline as a list of python dictionaries :param clear_buffer: Whether to clear the buffer after getting the output. """ return self.to_pandas(clear_buffer).to_dict(orient='records')