Skip to content Skip to sidebar Skip to footer

Pythonic Way To Chain Python Generator Function To Form A Pipeline

I'm doing a pipeline code refactoring using python. Assuming we have a series of generator functions and we want to chain those to form a data processing pipeline.

Solution 1:

I sometimes like to use a left fold (called reduce in Python) for this type of situation:

from functools import reduce
defpipeline(*steps):
    return reduce(lambda x, y: y(x), list(steps))

res = pipeline(range(0, 5), foo1, foo2, foo3)

Or even better:

def compose(*funcs):
    return lambda x: reduce(lambda f, g: g(f), list(funcs), x)

p = compose(foo1, foo2, foo3)
res = p(range(0, 5))

Solution 2:

Following up on your runner.run approach, let's define this utility function:

defrecur(ops):
    return ops[0](recur(ops[1:])) iflen(ops)>1else ops[0]

As an example:

>>> ops = foo3, foo2, foo1, range(0, 5)
>>> list( recur(ops) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']

Alternative: backward ordering

defbackw(ops):
    return ops[-1](backw(ops[:-1])) iflen(ops)>1else ops[0]

For example:

>>> list( backw([range(0, 5), foo1, foo2, foo3]) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']

Solution 3:

You can compose curried generator functions using PyMonad:

def main():
    odds = list * \
         non_divisibles(2) * \
         lengths * \
         Just(["1", "22", "333", "4444", "55555"])
    print(odds.getValue())    #prints [1, 3, 5]


@curry
def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Another alternative is to start with a Monad and compose the generators using fmap calls - this syntax is familiar to Java 8 Stream users:

def main():
    odds = Just(["1", "22", "333", "4444", "55555"]) \
        .fmap(lengths) \
        .fmap(non_divisibles(2)) \
        .fmap(list) \
        .getValue()
    print(odds)   #prints [1, 3, 5]


def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Note that the functions don't need to be decorated with @curry in this case. The entire chain of transformations is not evaluated until the terminal getValue() call.

Solution 4:

I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.

There is a fairly trivial, and in my opinion clear, way of chaining generators: assigning the result of each to a variable, where each can have a descriptive name.

range_iter = range(0, 5)
foo1_iter = foo1(range_iter)
foo2_iter = foo2(foo1_iter)
foo3_iter = foo3(foo2_iter)

for i in foo3_iter:
  print(i)

I prefer this to a something that uses a higher order function, e.g. a reduce or similar:

  • In my real cases, often each foo* generator function needs its own other parameters, which is tricky if using a reduce.

  • In my real cases, the steps in the pipeline are not dynamic at runtime: it seems a bit odd/unexpected (to me) to have a pattern that seems more appropriate for a dynamic case.

  • It's a bit inconsistent with how regular functions are typically written where each is called explicitly, and the result of each is passed to the call of the next. Yes, I guess a bit of duplication, but I'm happy with "calling a function" being duplicated since (to me) it's really clear.

  • No need for an import: it uses core language features.

Solution 5:

Here is another answer in case the function in your example are one-time(or one-use) function. Some nice variable naming and use of generator expression can be helpful for small operations.

>>>g = range(0, 5)>>>foo1 = (x+1for x in g)>>>foo2 = (x+10for x in foo1)>>>foo3 = ('foo3:' + str(x) for x in foo2)>>>for x in foo3:...print x...
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

Post a Comment for "Pythonic Way To Chain Python Generator Function To Form A Pipeline"