Skip to content

pipeline.py

Pipeline

__call__(self, *args) special

Alias for to_blocking

Source code in rxpipes/pipeline.py
def __call__(self, *args):
    """
    Alias for to_blocking
    """

    return self.to_blocking(*args)

__init__(self, *args, **kwargs) special

Pipeline

Args and kwargs will be forwarded to user defined setup

Parameters:

Name Type Description Default
args Optional[Any]

args passed to user defined setup

()
kwargs Optional[Any]

kwargs passed to user defined setup

{}
Source code in rxpipes/pipeline.py
def __init__(self, *args: Optional[Any], **kwargs: Optional[Any]):
    """Pipeline

    Args and kwargs will be forwarded to user defined setup

    Args:
        args: args passed to user defined setup
        kwargs: kwargs passed to user defined setup
    """

    # call user setup
    self.setup(*args, **kwargs)

pipe(*pipelines) classmethod

Can be used as a class or instance method to create a new pipeline chain

Parameters:

Name Type Description Default
pipelines Pipeline

variable number of pipelines

()

Returns:

Type Description
Pipeline

newly composed Pipeline instance

Source code in rxpipes/pipeline.py
@class_or_instance_method
def pipe(self, *pipelines: "Pipeline") -> "Pipeline":
    """

    Can be used as a class or instance method to create a new pipeline chain

    Args:
        pipelines: variable number of pipelines

    Returns:
        newly composed Pipeline instance
    """

    # called as instance method
    if not isinstance(self, type):
        parent = self
        return type(
            "Pipeline",
            (Pipeline,),
            {
                "transform": lambda self: rx.pipe(
                    parent.transform(), *[p.transform() for p in pipelines]
                )
            },
        )()

    # called as class method
    else:
        return type(
            "Pipeline",
            (Pipeline,),
            {
                "transform": lambda self: rx.pipe(
                    *[p.transform() for p in pipelines]
                )
            },
        )()

setup(self, *args, **kwargs)

Override this setup function to implement custom functionality when subclassing Pipeline

Parameters:

Name Type Description Default
args Optional[Any]

user defined args

()
kwargs Optional[Any]

user defined kwards

{}
Source code in rxpipes/pipeline.py
def setup(self, *args: Optional[Any], **kwargs: Optional[Any]):
    """
    Override this setup function to implement custom functionality when subclassing Pipeline

    Args:
        args: user defined args
        kwargs: user defined kwards
    """

transform(self)

Override this transform function to implement custom functionality when subclassing Pipeline

Returns:

Type Description
Callable[[rx.core.Observable], rx.core.Observable]

a function mapping an observable to another (default = lambda x: x)

Source code in rxpipes/pipeline.py
def transform(self) -> Callable[[rx.typing.Observable], rx.typing.Observable]:
    """
    Override this transform function to implement custom functionality when subclassing Pipeline

    Returns:
        a function mapping an observable to another (default = lambda x: x)
    """
    return lambda x: x

pipeline_from_operator_jit(op)

Returns a function that when called returns a Pipeline instance which pipes the parent transform to the injected operator

Parameters:

Name Type Description Default
op Callable[[rx.core.Observable], rx.core.Observable]

rx operator

required

Returns:

Type Description

function returning a just-in-time created Pipeline instance

Source code in rxpipes/pipeline.py
def pipeline_from_operator_jit(
    op: Callable[[rx.typing.Observable], rx.typing.Observable]
):
    """
    Returns a function that when called returns a Pipeline instance which
    pipes the parent transform to the injected operator

    Args:
        op: rx operator

    Returns:
        function returning a just-in-time created Pipeline instance
    """

    @class_or_instance_method
    def _f(parent, *args, **kwargs):
        if not isinstance(parent, type):
            return type(
                "Pipeline",
                (Pipeline,),
                {
                    "transform": lambda _: rx.pipe(
                        parent.transform(), op(*args, **kwargs)
                    )
                },
            )()
        else:
            return type(
                "Pipeline",
                (Pipeline,),
                {"transform": lambda _: op(*args, **kwargs)},
            )()

    return _f