pipeline.py
Pipeline
__call__(self, *args)
special
Alias for to_blocking
Source code in rxpipes/pipeline.py
def __call__(self, *args):
"""
Alias for to_blocking
"""
return self.to_blocking(*args)
__init__(self, *args, **kwargs)
special
Pipeline
Args and kwargs will be forwarded to user defined setup
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args |
Optional[Any] |
args passed to user defined setup |
() |
kwargs |
Optional[Any] |
kwargs passed to user defined setup |
{} |
Source code in rxpipes/pipeline.py
def __init__(self, *args: Optional[Any], **kwargs: Optional[Any]):
"""Pipeline
Args and kwargs will be forwarded to user defined setup
Args:
args: args passed to user defined setup
kwargs: kwargs passed to user defined setup
"""
# call user setup
self.setup(*args, **kwargs)
pipe(*pipelines)
classmethod
Can be used as a class or instance method to create a new pipeline chain
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pipelines |
Pipeline |
variable number of pipelines |
() |
Returns:
| Type | Description |
|---|---|
Pipeline |
newly composed Pipeline instance |
Source code in rxpipes/pipeline.py
@class_or_instance_method
def pipe(self, *pipelines: "Pipeline") -> "Pipeline":
"""
Can be used as a class or instance method to create a new pipeline chain
Args:
pipelines: variable number of pipelines
Returns:
newly composed Pipeline instance
"""
# called as instance method
if not isinstance(self, type):
parent = self
return type(
"Pipeline",
(Pipeline,),
{
"transform": lambda self: rx.pipe(
parent.transform(), *[p.transform() for p in pipelines]
)
},
)()
# called as class method
else:
return type(
"Pipeline",
(Pipeline,),
{
"transform": lambda self: rx.pipe(
*[p.transform() for p in pipelines]
)
},
)()
setup(self, *args, **kwargs)
Override this setup function to implement custom functionality when subclassing Pipeline
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args |
Optional[Any] |
user defined args |
() |
kwargs |
Optional[Any] |
user defined kwards |
{} |
Source code in rxpipes/pipeline.py
def setup(self, *args: Optional[Any], **kwargs: Optional[Any]):
"""
Override this setup function to implement custom functionality when subclassing Pipeline
Args:
args: user defined args
kwargs: user defined kwards
"""
transform(self)
Override this transform function to implement custom functionality when subclassing Pipeline
Returns:
| Type | Description |
|---|---|
Callable[[rx.core.Observable], rx.core.Observable] |
a function mapping an observable to another (default = lambda x: x) |
Source code in rxpipes/pipeline.py
def transform(self) -> Callable[[rx.typing.Observable], rx.typing.Observable]:
"""
Override this transform function to implement custom functionality when subclassing Pipeline
Returns:
a function mapping an observable to another (default = lambda x: x)
"""
return lambda x: x
pipeline_from_operator_jit(op)
Returns a function that when called returns a Pipeline instance which pipes the parent transform to the injected operator
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
op |
Callable[[rx.core.Observable], rx.core.Observable] |
rx operator |
required |
Returns:
| Type | Description |
|---|---|
|
function returning a just-in-time created Pipeline instance |
Source code in rxpipes/pipeline.py
def pipeline_from_operator_jit(
op: Callable[[rx.typing.Observable], rx.typing.Observable]
):
"""
Returns a function that when called returns a Pipeline instance which
pipes the parent transform to the injected operator
Args:
op: rx operator
Returns:
function returning a just-in-time created Pipeline instance
"""
@class_or_instance_method
def _f(parent, *args, **kwargs):
if not isinstance(parent, type):
return type(
"Pipeline",
(Pipeline,),
{
"transform": lambda _: rx.pipe(
parent.transform(), op(*args, **kwargs)
)
},
)()
else:
return type(
"Pipeline",
(Pipeline,),
{"transform": lambda _: op(*args, **kwargs)},
)()
return _f