diff --git a/16-coroutine/coroaverager2.py b/16-coroutine/coroaverager2.py index 8f52300..3b9197c 100644 --- a/16-coroutine/coroaverager2.py +++ b/16-coroutine/coroaverager2.py @@ -30,7 +30,7 @@ >>> try: ... coro_avg.send(None) ... except StopIteration as exc: - ... result = exc.value + ... result = exc.value # 异常对象的value属性保存着返回的值 ... >>> result Result(count=3, average=15.5) diff --git a/16-coroutine/taxi_sim.py b/16-coroutine/taxi_sim.py index e9c4cc1..03057df 100644 --- a/16-coroutine/taxi_sim.py +++ b/16-coroutine/taxi_sim.py @@ -66,8 +66,13 @@ # BEGIN TAXI_PROCESS def taxi_process(ident, trips, start_time=0): # <1> """Yield to simulator issuing event at each state change""" + """每次改变状态时候创建事件,把控制权让给仿真器 + ident: 出租车的编号 + trips: 出租车回家之前的行程数 + start_time: 离开车库的时间 + """ time = yield Event(start_time, ident, 'leave garage') # <2> - for i in range(trips): # <3> + for i in range(trips): # <3> 每次行程都会执行如下代码 time = yield Event(time, ident, 'pick up passenger') # <4> time = yield Event(time, ident, 'drop off passenger') # <5> @@ -80,17 +85,21 @@ def taxi_process(ident, trips, start_time=0): # <1> class Simulator: def __init__(self, procs_map): - self.events = queue.PriorityQueue() - self.procs = dict(procs_map) + self.events = queue.PriorityQueue() # 排序队列,按照时间排序 + self.procs = dict(procs_map) # 复制参数 def run(self, end_time): # <1> - """Schedule and display events until time is up""" + """Schedule and display events until time is up + 排定并显示事件,直到事件结束 + """ # schedule the first event for each cab + # 排定各个出租车的第一个事件 for _, proc in sorted(self.procs.items()): # <2> first_event = next(proc) # <3> self.events.put(first_event) # <4> # main loop of the simulation + # 仿真系统的主循环 sim_time = 0 # <5> while sim_time < end_time: # <6> if self.events.empty(): # <7> diff --git a/16-extend/FinalGenerator.pdf b/16-extend/FinalGenerator.pdf new file mode 100644 index 0000000..22859aa Binary files /dev/null and b/16-extend/FinalGenerator.pdf differ diff --git a/16-extend/actor1.py b/16-extend/actor1.py new file mode 100644 index 0000000..aaa4b28 --- /dev/null +++ b/16-extend/actor1.py @@ -0,0 +1,28 @@ +# actor1.py +# +# Simple attempt at actors + +_registry = { } + +def send(name, msg): + _registry[name].send(msg) + +def actor(func): + def wrapper(*args, **kwargs): + gen = func(*args, **kwargs) + next(gen) + _registry[func.__name__] = gen + return wrapper + +if __name__ == '__main__': + @actor + def printer(): + while True: + msg = yield + print('printer:', msg) + + printer() + n = 10 + while n > 0: + send('printer', n) + n -= 1 diff --git a/16-extend/actor2.py b/16-extend/actor2.py new file mode 100644 index 0000000..97fe82d --- /dev/null +++ b/16-extend/actor2.py @@ -0,0 +1,33 @@ +# actor2.py +# +# Stackless recursive ping-pong + +_registry = { } + +def send(name, msg): + _registry[name].send(msg) + +def actor(func): + def wrapper(*args, **kwargs): + gen = func(*args, **kwargs) + next(gen) + _registry[func.__name__] = gen + return wrapper + +@actor +def ping(): + while True: + n = yield + print('ping %d' % n) + send('pong', n + 1) + +@actor +def pong(): + while True: + n = yield + print('pong %d' % n) + send('ping', n + 1) + +ping() +pong() +send('ping', 0) diff --git a/16-extend/actor3.py b/16-extend/actor3.py new file mode 100644 index 0000000..a226f62 --- /dev/null +++ b/16-extend/actor3.py @@ -0,0 +1,42 @@ +# actor3.py +# +# Stackless recursive ping-pong + +from collections import deque + +_registry = { } +_msg_queue = deque() + +def send(name, msg): + _msg_queue.append((name, msg)) + +def run(): + while _msg_queue: + name, msg = _msg_queue.popleft() + _registry[name].send(msg) + +def actor(func): + def wrapper(*args, **kwargs): + gen = func(*args, **kwargs) + next(gen) + _registry[func.__name__] = gen + return wrapper + +@actor +def ping(): + while True: + n = yield + print('ping %d' % n) + send('pong', n + 1) + +@actor +def pong(): + while True: + n = yield + print('pong %d' % n) + send('ping', n + 1) + +ping() +pong() +send('ping', 0) +run() diff --git a/16-extend/async1.py b/16-extend/async1.py new file mode 100644 index 0000000..fd78ba2 --- /dev/null +++ b/16-extend/async1.py @@ -0,0 +1,14 @@ +import asyncio + +def func(x, y): + return x + y + +@asyncio.coroutine +def do_func(x, y): + yield from asyncio.sleep(1) + return func(x, y) + +loop = asyncio.get_event_loop() +result = loop.run_until_complete(do_func(2,3)) +print("Got:", result) + diff --git a/16-extend/async2.py b/16-extend/async2.py new file mode 100644 index 0000000..66327ff --- /dev/null +++ b/16-extend/async2.py @@ -0,0 +1,17 @@ +import asyncio + +@asyncio.coroutine +def echo_client(reader, writer): + print("Client starting") + while True: + line = yield from reader.readline() + if not line: + break + resp = b'Got:' + line + writer.write(resp) + print("Client closed") + writer.close() + +loop = asyncio.get_event_loop() +loop.run_until_complete(asyncio.start_server(echo_client, host='', port=25000)) +loop.run_forever() diff --git a/16-extend/asyncecho.py b/16-extend/asyncecho.py new file mode 100644 index 0000000..1b0eff5 --- /dev/null +++ b/16-extend/asyncecho.py @@ -0,0 +1,29 @@ +# echoasyncio.py + +from socket import socket, AF_INET, SOCK_STREAM +import asyncio + +loop = asyncio.get_event_loop() + +@asyncio.coroutine +def echo_client(sock): + while True: + data = yield from loop.sock_recv(sock, 8192) + if not data: + break + yield from loop.sock_sendall(sock, data) + sock.close() + +def echo_server(address): + sock = socket(AF_INET, SOCK_STREAM) + sock.bind(address) + sock.listen(5) + sock.setblocking(False) + while True: + client_sock, addr = yield from loop.sock_accept(sock) + asyncio.async(echo_client(client_sock)) + +if __name__ == '__main__': + asyncio.async(echo_server(('',25000))) + loop.run_forever() + diff --git a/16-extend/cm.py b/16-extend/cm.py new file mode 100644 index 0000000..cea9203 --- /dev/null +++ b/16-extend/cm.py @@ -0,0 +1,49 @@ +# cm.py +# +# Context-manager example. See also contextlib.py in the standard library which +# addresses some rather tricky corner cases. + +import sys + +class GeneratorCM(object): + def __init__(self, gen): + self.gen = gen + + def __enter__(self): + return next(self.gen) + + def __exit__(self, etype, val, tb): + try: + if etype is None: + next(self.gen) + else: + self.gen.throw(etype, val, tb) + raise RuntimeError("Generator didn't stop") + except StopIteration: + return True + except: + if sys.exc_info()[1] is not val: raise + +def contextmanager(func): + def run(*args, **kwargs): + return GeneratorCM(func(*args, **kwargs)) + return run + +# Simple Example +if __name__ == '__main__': + import time + @contextmanager + def timethis(): + start = time.time() + try: + yield + finally: + end = time.time() + print(end-start) + + with timethis(): + n = 1000000 + while n > 0: + n -= 1 + + diff --git a/16-extend/compiler1.py b/16-extend/compiler1.py new file mode 100644 index 0000000..cbb602a --- /dev/null +++ b/16-extend/compiler1.py @@ -0,0 +1,112 @@ +# compiler1.py +# +# Compiler that builds a simple AST and evaluates it using the visitor pattern + +import re +from collections import namedtuple + +# ---- Tokenizer + +tokens = [ + r'(?P\d+)', + r'(?P\+)', + r'(?P-)', + r'(?P\*)', + r'(?P/)', + r'(?P\s+)', + ] + +master_re = re.compile('|'.join(tokens)) +Token = namedtuple('Token', ['type','value']) +def tokenize(text): + scan = master_re.scanner(text) + return (Token(m.lastgroup, m.group()) + for m in iter(scan.match, None) if m.lastgroup != 'WS') + +# ---- AST Nodes + +class Node: + _fields = [] + def __init__(self, *args): + for name, value in zip(self._fields, args): + setattr(self, name, value) + +class BinOp(Node): + _fields = ['op', 'left', 'right'] + +class Number(Node): + _fields = ['value'] + +# ---- Simple recursive descent parser + +def parse(toks): + lookahead, current = next(toks, None), None + def accept(*toktypes): + nonlocal lookahead, current + if lookahead and lookahead.type in toktypes: + current, lookahead = lookahead, next(toks, None) + return True + + def expr(): + left = term() + while accept('PLUS','MINUS'): + left = BinOp(current.value, left) + left.right = term() + return left + + def term(): + left = factor() + while accept('TIMES','DIVIDE'): + left = BinOp(current.value, left) + left.right = factor() + return left + + def factor(): + if accept('NUM'): + return Number(int(current.value)) + else: + raise SyntaxError() + return expr() + +# ---- Visitor pattern + +class NodeVisitor: + def visit(self, node): + return getattr(self, 'visit_' + type(node).__name__)(node) + +class Evaluator(NodeVisitor): + def visit_Number(self, node): + return node.value + + def visit_BinOp(self, node): + leftval = self.visit(node.left) + rightval = self.visit(node.right) + if node.op == '+': + return leftval + rightval + elif node.op == '-': + return leftval - rightval + elif node.op == '*': + return leftval * rightval + elif node.op == '/': + return leftval * rightval + +# ---- Examples +if __name__ == '__main__': + text = '2 + 3*4 - 5' + toks = tokenize(text) + tree = parse(toks) + + print('---- Evaluation') + result = Evaluator().visit(tree) + print('Result:', result) + + def explosion(): + 'Run me to see a spectacular fail' + text = '+'.join(str(x) for x in range(1000)) # Make '0+1+2+3+...+999' + toks = tokenize(text) + tree = parse(toks) + val = Evaluator().visit(tree) + print('Result:', val) + + + diff --git a/16-extend/compiler2.py b/16-extend/compiler2.py new file mode 100644 index 0000000..4e9d7aa --- /dev/null +++ b/16-extend/compiler2.py @@ -0,0 +1,125 @@ +import re +from collections import namedtuple +import types + +# ---- Tokenizer + +tokens = [ + r'(?P\d+)', + r'(?P\+)', + r'(?P-)', + r'(?P\*)', + r'(?P/)', + r'(?P\s+)', + ] + +master_re = re.compile('|'.join(tokens)) +Token = namedtuple('Token', ['type','value']) +def tokenize(text): + scan = master_re.scanner(text) + return (Token(m.lastgroup, m.group()) + for m in iter(scan.match, None) if m.lastgroup != 'WS') + +# ---- AST Nodes + +class Node: + _fields = [] + def __init__(self, *args): + for name, value in zip(self._fields, args): + setattr(self, name, value) + +class BinOp(Node): + _fields = ['op', 'left', 'right'] + +class Number(Node): + _fields = ['value'] + +# ---- Simple recursive descent parser + +def parse(toks): + lookahead, current = next(toks, None), None + def accept(*toktypes): + nonlocal lookahead, current + if lookahead and lookahead.type in toktypes: + current, lookahead = lookahead, next(toks, None) + return True + + def expr(): + left = term() + while accept('PLUS','MINUS'): + left = BinOp(current.value, left) + left.right = term() + return left + + def term(): + left = factor() + while accept('TIMES','DIVIDE'): + left = BinOp(current.value, left) + left.right = factor() + return left + + def factor(): + if accept('NUMBER'): + return Number(int(current.value)) + else: + raise SyntaxError() + return expr() + +# ---- Nonrecursive visitor pattern using generators + +class NodeVisitor: + def visit(self, node): + stack = [ self.genvisit(node) ] + result = None + while stack: + try: + node = stack[-1].send(result) + stack.append(self.genvisit(node)) + result = None + except StopIteration as exc: + stack.pop() + result = exc.value + return result + + def genvisit(self, node): + result = getattr(self, 'visit_' + type(node).__name__)(node) + return (yield from result) if isinstance(result, types.GeneratorType) else result + +class Evaluator(NodeVisitor): + def visit_Number(self, node): + return node.value + + def visit_BinOp(self, node): + leftval = yield node.left + rightval = yield node.right + if node.op == '+': + return leftval + rightval + elif node.op == '-': + return leftval - rightval + elif node.op == '*': + return leftval * rightval + elif node.op == '/': + return leftval * rightval + +# ---- Example + +if __name__ == '__main__': + text = '2 + 3*4 - 5' + toks = tokenize(text) + tree = parse(toks) + + print('---- Evaluation') + result = Evaluator().visit(tree) + print('Result:', result) + + + def explosion(): + 'Run me to see a spectacular fail' + text = '+'.join(str(x) for x in range(1000)) # Make '0+1+2+3+...+999' + toks = tokenize(text) + tree = parse(toks) + val = Evaluator().visit(tree) + print('Result:', val) + + print('---- Evil Evaluation') + explosion() diff --git a/16-extend/fib1.py b/16-extend/fib1.py new file mode 100644 index 0000000..e97c41d --- /dev/null +++ b/16-extend/fib1.py @@ -0,0 +1,22 @@ +# fib1.py +# +# Initial fibonacci example using inline futures + +from inline_future import inlined_future, run_inline_future +from concurrent.futures import ProcessPoolExecutor + +def fib(n): + return 1 if n <= 2 else (fib(n-1) + fib(n-2)) + +@inlined_future +def compute_fibs(n): + result = [] + for i in range(n): + val = yield from pool.submit(fib, i) + result.append(val) + return result + + +pool = ProcessPoolExecutor(4) +result = run_inline_future(compute_fibs(35)) +print(result) diff --git a/16-extend/fib2.py b/16-extend/fib2.py new file mode 100644 index 0000000..b6b6714 --- /dev/null +++ b/16-extend/fib2.py @@ -0,0 +1,36 @@ +# fib2.py +# +# Performance test (GIL!) + +from inline_future import inlined_future, run_inline_future, start_inline_future +from concurrent.futures import ProcessPoolExecutor +import threading +import time + +def fib(n): + return 1 if n <= 2 else (fib(n-1) + fib(n-2)) + +@inlined_future +def compute_fibs(n): + result = [] + for i in range(n): + # print(threading.current_thread()) # Uncomment to see weird thread switching + val = yield from pool.submit(fib, i) + result.append(val) + return result + +pool = ProcessPoolExecutor(4) + +start = time.time() +result1 = run_inline_future(compute_fibs(34)) +result2 = run_inline_future(compute_fibs(34)) +end = time.time() +print("Sequential:", end-start) + +start = time.time() +t1 = start_inline_future(compute_fibs(34)) +t2 = start_inline_future(compute_fibs(34)) +result1 = t1.result() +result2 = t2.result() +end = time.time() +print("Parallel:", end-start) diff --git a/16-extend/fib3.py b/16-extend/fib3.py new file mode 100644 index 0000000..44db2bd --- /dev/null +++ b/16-extend/fib3.py @@ -0,0 +1,58 @@ +# fib3.py +# +# Different thread execution model. Here, the inlined future is constrained to a single +# execution thread. You get the same performance, but control flow doesn't change threads. + +from inline_future import inlined_future, run_inline_future +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +import threading +import time + +def run_inline_thread(gen): + value = None + exc = None + while True: + try: + if exc: + fut = gen.throw(exc) + else: + fut = gen.send(value) + try: + value = fut.result() + exc = None + except Exception as e: + exc = e + except StopIteration as exc: + return exc.value + +def fib(n): + return 1 if n <= 2 else (fib(n-1) + fib(n-2)) + +@inlined_future +def compute_fibs(n): + result = [] + for i in range(n): +# print(threading.current_thread()) # Uncomment to see weirdness + val = yield from pool.submit(fib, i) + result.append(val) + return result + +pool = ProcessPoolExecutor(4) + +start = time.time() +result = run_inline_future(compute_fibs(34)) +result = run_inline_future(compute_fibs(34)) +end = time.time() +print("Sequential:", end-start) + +tpool = ThreadPoolExecutor(8) +start = time.time() +t1 = tpool.submit(run_inline_thread, compute_fibs(34)) +t2 = tpool.submit(run_inline_thread, compute_fibs(34)) +result1 = t1.result() +result2 = t2.result() +end = time.time() +print("Parallel:", end-start) + + + diff --git a/16-extend/index.html b/16-extend/index.html new file mode 100644 index 0000000..26238cc --- /dev/null +++ b/16-extend/index.html @@ -0,0 +1,188 @@ + + +Generators: The Final Frontier + + + + + + + +
+ + + + +
+ +Advanced Python Training
+In Chicago with David Beazley
+ +

+

  • Practical + Python Programming
  • +
  • Advanced + Python Mastery
  • +
  • Write a Compiler
  • +

    +

    + Click here for + more details! +

    +
    +

    +
    +
    + +

    Generators: The Final Frontier

    + +Copyright (C) 2014
    +David M. Beazley
    +http://www.dabeaz.com
    + +

    +Presented at PyCon'14, April 10, 2014, Montreal. + +

    Introduction

    + +

    +This tutorial discusses advanced uses of using generators to alter program control flow, +explode brains, and exponentially increase your job security. Topics include context managers, +inlined futures, concurrency, asyncio, actors, compilers, and more. +

    + + + +

    Support Data Files

    + +The following file contains some supporting data files that are used by the various +code samples. Download this to your machine to work the examples that follow. + + + +This download includes the presentation slides and all of the code sample below. + +

    Background Material

    + +

    +This tutorial is part 3 of a trilogy of tutorials involving generators and coroutines. Although it +stands on its own, you may want to review the first two parts to get the bigger picture: +

    + + + +

    Code Samples

    + +Here are various code samples that are used in the course. You can +cut and paste these to your own machine to try them out. The order in +which these are listed follow the course outline. You'll need Python 3.4. + +

    +Part 2 : And Now For Something Completely Different + +

      +
    • cm.py. Sample implementation of context managers using generators. +Similar to contextlib standard library. +
    + +Part 3 : Call me Maybe + + + +

    +Part 4: Yield From Yield From Yield From Future +

    + +
      +

      +

    • inline_puzzle.py. Various attempts to make library functions.
    • +

      + +

      +

    • inline_iter.py. Near complete solution involving iterable Futures.
    • +

      + +

      +

    • inline_future.py. Final solution with tasks, inlined futures, proper result handling.
    • +

      + +

      +

    • async1.py. Simple example from asyncio library.
    • +

      + +

      +

    • async2.py. Echo server example from asyncio
    • +

      +
    + +

    +Part 5: GIL +

    + +
      +

      +

    • fib1.py. Fibonacci numbers in a process pool and inlined futures.
    • +

      + +

      +

    • fib2.py. Performance test, side-stepping the GIL (it works!)
    • +

      + +

      +

    • fib3.py. Performance test with a different thread execution model. +

      + +
    + + +

    +Part 6 : Fake it Until You Make It (Actors) +

    + +
      +

    • actor1.py. Simple actor example.
    • + +

    • actor2.py. Recursive ping-pong (broken).
    • + +

    • tactor.py. Recursive ping-pong using threads.
    • + +

    • actor3.py. Recursive ping-pong with simple message queue and runner.
    • +
    + +

    +Part 7: A Terrifying Visitor +

    + +
      +

    • compiler1.py. A simple compiler and tree traversal using the visitor pattern.
    • + +

    • compiler2.py. Non-recursive tree traversal using generators (insane).
    • +
    + +
    + + + diff --git a/16-extend/inline1.py b/16-extend/inline1.py new file mode 100644 index 0000000..bce7cfd --- /dev/null +++ b/16-extend/inline1.py @@ -0,0 +1,48 @@ +# inline1.py +# +# Simple inline future formulation + +class Task: + def __init__(self, gen): + self._gen = gen + + def step(self, value=None): + try: + fut = self._gen.send(value) + fut.add_done_callback(self._wakeup) + except StopIteration as exc: + pass + + def _wakeup(self, fut): + result = fut.result() + self.step(result) + +# Example +if __name__ == '__main__': + from concurrent.futures import ThreadPoolExecutor + import time + + pool = ThreadPoolExecutor(max_workers=8) + + def func(x, y): + time.sleep(1) + return x + y + + def do_func(x, y): + result = yield pool.submit(func, x, y) + print('Got:', result) + + t = Task(do_func(2,3)) + t.step() + + # Example of a function that makes repeated requests to the pool + def do_many(n): + while n > 0: + result = yield pool.submit(func, n, n) + print('Got:', result) + n -= 1 + + t2 = Task(do_many(10)) + t2.step() + + diff --git a/16-extend/inline2.py b/16-extend/inline2.py new file mode 100644 index 0000000..6e94b4c --- /dev/null +++ b/16-extend/inline2.py @@ -0,0 +1,48 @@ +# inline2.py +# +# Inline future forumulation with exception handling added + +class Task: + def __init__(self, gen): + self._gen = gen + + def step(self, value=None, exc=None): + try: + if exc: + fut = self._gen.throw(exc) + else: + fut = self._gen.send(value) + fut.add_done_callback(self._wakeup) + except StopIteration as exc: + pass + + def _wakeup(self, fut): + try: + result = fut.result() + self.step(result, None) + except Exception as exc: + self.step(None, exc) + +# Example +if __name__ == '__main__': + from concurrent.futures import ThreadPoolExecutor + import time + + pool = ThreadPoolExecutor(max_workers=8) + + def func(x, y): + time.sleep(1) + return x + y + + def do_func(x, y): + try: + result = yield pool.submit(func, x, y) + print('Got:', result) + except Exception as e: + print('Failed:', repr(e)) + + t = Task(do_func(2,3)) + t.step() + + + diff --git a/16-extend/inline_final.py b/16-extend/inline_final.py new file mode 100644 index 0000000..5f80bd6 --- /dev/null +++ b/16-extend/inline_final.py @@ -0,0 +1,84 @@ +# inline_final.py +# +# Final implementation + +from concurrent.futures import Future +import inspect + +def patch_future(cls): + def __iter__(self): + if not self.done(): + yield self + return self.result() + cls.__iter__ = __iter__ + +patch_future(Future) + +class Task(Future): + def __init__(self, gen): + super().__init__() + self._gen = gen + + def step(self, value=None, exc=None): + try: + if exc: + fut = self._gen.throw(exc) + else: + fut = self._gen.send(value) + fut.add_done_callback(self._wakeup) + except StopIteration as exc: + # Set the result of the task (return value from generator) + self.set_result(exc.value) + + def _wakeup(self, fut): + try: + result = fut.result() + self.step(result, None) + except Exception as exc: + self.step(None, exc) + +def inlined_future(func): + assert inspect.isgeneratorfunction(func) + return func + +def start_inline_future(fut): + t = Task(fut) + t.step() + return t + +def run_inline_future(fut): + t = start_inline_future(fut) + return t.result() + +# ------- Example +if __name__ == '__main__': + import time + from concurrent.futures import ThreadPoolExecutor + pool = ThreadPoolExecutor(max_workers=8) + + def func(x, y): + time.sleep(1) + return x + y + + @inlined_future + def do_func(x, y): + try: + result = yield pool.submit(func, x, y) + return result + except Exception as e: + print('Failed:', repr(e)) + + @inlined_future + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield from pool.submit(time.sleep, delay) + result = yield from fut + return result + + result = run_inline_future(do_func(2,3)) + print('Result:', result) + + result = run_inline_future(after(10, do_func(2,3))) + print('Result:', result) diff --git a/16-extend/inline_future.py b/16-extend/inline_future.py new file mode 100644 index 0000000..f8707ee --- /dev/null +++ b/16-extend/inline_future.py @@ -0,0 +1,84 @@ +# inline_future.py +# +# Final implementation + +from concurrent.futures import Future +import inspect + +def patch_future(cls): + def __iter__(self): + if not self.done(): + yield self + return self.result() + cls.__iter__ = __iter__ + +patch_future(Future) + +class Task(Future): + def __init__(self, gen): + super().__init__() + self._gen = gen + + def step(self, value=None, exc=None): + try: + if exc: + fut = self._gen.throw(exc) + else: + fut = self._gen.send(value) + fut.add_done_callback(self._wakeup) + except StopIteration as exc: + # Set the result of the task (return value from generator) + self.set_result(exc.value) + + def _wakeup(self, fut): + try: + result = fut.result() + self.step(result, None) + except Exception as exc: + self.step(None, exc) + +def inlined_future(func): + assert inspect.isgeneratorfunction(func) + return func + +def start_inline_future(fut): + t = Task(fut) + t.step() + return t + +def run_inline_future(fut): + t = start_inline_future(fut) + return t.result() + +# ------- Example +if __name__ == '__main__': + import time + from concurrent.futures import ThreadPoolExecutor + pool = ThreadPoolExecutor(max_workers=8) + + def func(x, y): + time.sleep(1) + return x + y + + @inlined_future + def do_func(x, y): + try: + result = yield pool.submit(func, x, y) + return result + except Exception as e: + print('Failed:', repr(e)) + + @inlined_future + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield from pool.submit(time.sleep, delay) + result = yield from fut + return result + + result = run_inline_future(do_func(2,3)) + print('Result:', result) + + result = run_inline_future(after(10, do_func(2,3))) + print('Result:', result) diff --git a/16-extend/inline_iter.py b/16-extend/inline_iter.py new file mode 100644 index 0000000..3800b44 --- /dev/null +++ b/16-extend/inline_iter.py @@ -0,0 +1,67 @@ +# inline_iter.py +# +# Patched Future class to make it suitable for use with yield from by +# making it iterable + +from concurrent.futures import Future + +def patch_future(cls): + def __iter__(self): + if not self.done(): + yield self + return self.result() + cls.__iter__ = __iter__ + +patch_future(Future) + +class Task: + def __init__(self, gen): + self._gen = gen + + def step(self, value=None, exc=None): + try: + if exc: + fut = self._gen.throw(exc) + else: + fut = self._gen.send(value) + fut.add_done_callback(self._wakeup) + except StopIteration as exc: + pass + + def _wakeup(self, fut): + try: + result = fut.result() + self.step(result, None) + except Exception as exc: + self.step(None, exc) + +# ------- Example +if __name__ == '__main__': + import time + from concurrent.futures import ThreadPoolExecutor + pool = ThreadPoolExecutor(max_workers=8) + + def func(x, y): + time.sleep(1) + return x + y + + def do_func(x, y): + try: + result = yield pool.submit(func, x, y) + print('Got:', result) + except Exception as e: + print('Failed:', repr(e)) + + def example5(): + ''' + Now it works! + ''' + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield from pool.submit(time.sleep, delay) + yield from fut + + Task(after(10, do_func(2, 3))).step() + diff --git a/16-extend/inline_puzzle.py b/16-extend/inline_puzzle.py new file mode 100644 index 0000000..a529a84 --- /dev/null +++ b/16-extend/inline_puzzle.py @@ -0,0 +1,113 @@ +# inline_puzzle.py +# +# Various attempts at making library functions work (puzzler) + +class Task: + def __init__(self, gen): + self._gen = gen + + def step(self, value=None, exc=None): + try: + if exc: + fut = self._gen.throw(exc) + else: + fut = self._gen.send(value) + fut.add_done_callback(self._wakeup) + except StopIteration as exc: + pass + + def _wakeup(self, fut): + try: + result = fut.result() + self.step(result, None) + except Exception as exc: + self.step(None, exc) + +# ------- Example +if __name__ == '__main__': + import time + from concurrent.futures import ThreadPoolExecutor + pool = ThreadPoolExecutor(max_workers=8) + + def func(x, y): + time.sleep(1) + return x + y + + def do_func(x, y): + try: + result = yield pool.submit(func, x, y) + print('Got:', result) + except Exception as e: + print('Failed:', repr(e)) + + def example1(): + ''' + Broken. The 'yield fut' statement doesn't produce a proper Future object + ''' + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield pool.submit(time.sleep, delay) + yield fut + + Task(after(10, do_func(2, 3))).step() + + def example2(): + ''' + Broken. Runs, but result gets lost. + ''' + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield pool.submit(time.sleep, delay) + for f in fut: + yield f + + Task(after(10, do_func(2, 3))).step() + + def example3(): + ''' + Works, but solution not obvious. + ''' + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield pool.submit(time.sleep, delay) + try: + while True: + f = fut.send(result) + result = yield f + except StopIteration: + pass + + Task(after(10, do_func(2, 3))).step() + + def example4(): + ''' + Works, using yield from. But "yield" and "yield from" both used + ''' + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield pool.submit(time.sleep, delay) + yield from fut + + Task(after(10, do_func(2, 3))).step() + + def example5(): + ''' + Does not work. Can't use "yield from" everywhere + ''' + def after(delay, fut): + ''' + Run a future after a time delay. + ''' + yield from pool.submit(time.sleep, delay) + yield from fut + + Task(after(10, do_func(2, 3))).step() + diff --git a/16-extend/inline_recursive.py b/16-extend/inline_recursive.py new file mode 100644 index 0000000..c625c49 --- /dev/null +++ b/16-extend/inline_recursive.py @@ -0,0 +1,38 @@ +# inline_recursive.py +# +# Bizarre inline recursive example + +class Task: + def __init__(self, gen): + self._gen = gen + + def step(self, value=None, exc=None): + try: + if exc: + fut = self._gen.throw(exc) + else: + fut = self._gen.send(value) + fut.add_done_callback(self._wakeup) + except StopIteration as exc: + pass + + def _wakeup(self, fut): + try: + result = fut.result() + self.step(result, None) + except Exception as exc: + self.step(None, exc) + +# Example +if __name__ == '__main__': + from concurrent.futures import ThreadPoolExecutor + import time + + pool = ThreadPoolExecutor(max_workers=8) + + def recursive(n): + yield pool.submit(time.sleep, 0.001) + print('Tick:', n) + Task(recursive(n+1)).step() + + Task(recursive(0)).step() diff --git a/16-extend/simplefuture.py b/16-extend/simplefuture.py new file mode 100644 index 0000000..ae2a675 --- /dev/null +++ b/16-extend/simplefuture.py @@ -0,0 +1,52 @@ +# simplefuture.py +# +# Illustration of a future + +from concurrent.futures import ThreadPoolExecutor + +def func(x, y): + 'Some function. Nothing too interesting.' + import time + time.sleep(5) + return x + y + +if __name__ == '__main__': + pool = ThreadPoolExecutor(max_workers=8) + + def example1(): + ''' + Blocking. Wait for result. + ''' + fut = pool.submit(func, 2, 3) + r = fut.result() + print('Got:', r) + + def example2(): + ''' + Blocking. With exception handling. + ''' + fut = pool.submit(func, 2, 'Hello') + try: + r = fut.result() + print('Got:', r) + except Exception as e: + print('Whoops:', e) + + def example3(): + ''' + With callback. + ''' + fut = pool.submit(func, 2, 3) + fut.add_done_callback(result_handler) + + def result_handler(fut): + try: + result = fut.result() + print('Got:', result) + except Exception as e: + print('Whoops:', e) + + + example1() + example2() + example3() diff --git a/16-extend/tactor.py b/16-extend/tactor.py new file mode 100644 index 0000000..f30ec24 --- /dev/null +++ b/16-extend/tactor.py @@ -0,0 +1,56 @@ +import time +import threading +from functools import wraps +from queue import Queue + +class Actor(threading.Thread): + _registry = { } + def __init__(self, name, gen): + super().__init__() + self.daemon = True + self.gen = gen + self.mailbox = Queue() + Actor._registry[name] = self + self.start() + + def send(self, msg): + self.mailbox.put(msg) + + def run(self): + while True: + msg = self.mailbox.get() + self.gen.send(msg) + +def send(name, msg): + Actor._registry[name].send(msg) + +def actor(func): + @wraps(func) + def wrapper(*args, id=func.__name__, **kwargs): + gen = func(*args, **kwargs) + next(gen) + return Actor(id, gen) + return wrapper + +@actor +def ping(): + while True: + n = yield + print('ping %d' % n) + send('pong', n + 1) + +@actor +def pong(): + while True: + n = yield + print('pong %d' % n) + send('ping', n + 1) + +if __name__ == '__main__': + ping() + pong() + send('ping', 0) + while True: + time.sleep(1) + +