from prefect.engine.executors import LocalExecutor# ... your flow constructionstate = flow.run(executor=LocalExecutor())# <-- the executor needs to be initialized
from prefect import Flow, task
from prefect.utilities.debug import raise_on_exception
@task
def div(x):
return 1 / x
with Flow("My Flow") as f:
val = div(0)
with raise_on_exception():
f.run()
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
<ipython-input-1-82c40dd24406> in <module>()
11
12 with raise_on_exception():
---> 13 f.run()
... # the full traceback is long
<ipython-input-1-82c40dd24406> in div(x)
5 @task
6 def div(x):
----> 7 return 1 / x
8
9 with Flow("My Flow") as f:
ZeroDivisionError: division by zero
from prefect import Flow, task
@task
def gotcha():
tup = ('a', ['b'])
try:
tup[1] += ['c']
except TypeError:
assert len(tup[1]) == 1
flow = Flow(name="tuples", tasks=[gotcha])
state = flow.run()
state.result # {<Task: gotcha>: Failed("Unexpected error: AssertionError()")}
failed_state = state.result[gotcha]
raise failed_state.result
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-50-8efcdf8dacda> in gotcha()
7 try:
----> 8 tup[1] += ['c']
9 except TypeError:
TypeError: 'tuple' object does not support item assignment
During handling of the above exception, another exception occurred:
AssertionError Traceback (most recent call last)
<ipython-input-1-f0f986d2f159> in <module>
22
23 failed_state = state.result[gotcha]
---> 24 raise failed_state.result
~/Developer/prefect/src/prefect/engine/runner.py in inner(self, state, *args, **kwargs)
58
59 try:
---> 60 new_state = method(self, state, *args, **kwargs)
61 except ENDRUN as exc:
62 raise_end_run = True
~/Developer/prefect/src/prefect/engine/task_runner.py in get_task_run_state(self, state, inputs, timeout_handler)
697 self.logger.info("Running task...")
698 timeout_handler = timeout_handler or main_thread_timeout
--> 699 result = timeout_handler(self.task.run, timeout=self.task.timeout, **inputs)
700
701 # inform user of timeout
~/Developer/prefect/src/prefect/utilities/executors.py in multiprocessing_timeout(fn, timeout, *args, **kwargs)
68
69 if timeout is None:
---> 70 return fn(*args, **kwargs)
71 else:
72 timeout_length = timeout.total_seconds()
<ipython-input-1-f0f986d2f159> in gotcha()
9 tup[1] += ['c']
10 except TypeError:
---> 11 assert len(tup[1]) == 1
12
13
AssertionError:
%debug # using the IPython magic method to start a pdb session
from prefect import task, Flow
from prefect.environments.storage import Docker
# import a non-prefect package used for scraping reddit
import praw
@task
def whoami():
reddit = praw.Reddit(client_id='SI8pN3DSbt0zor',
client_secret='xaxkj7HNh8kwg8e5t4m6KvSrbTI',
password='1guiwevlfo00esyy',
user_agent='testscript by /u/fakebot3',
username='fakebot3')
return reddit.user.me()
storage = Docker(base_image="python:3.6", registry_url="http://my.personal.registry")
flow = Flow("reddit-flow", storage=storage, tasks=[whoami])
# note that this will require either a valid registry_url, or no registry_url
# push=False is important here; otherwise your local image will be deleted
built_storage = flow.storage.build(push=False)
Successfully built 0f3b0851148b # your ID will be different
# connect to an interactive python session running in the container
docker run -it 0f3b0851148b python
import cloudpickle
with open("/root/.prefect/reddit-flow.prefect", "rb") as f:
flow = cloudpickle.load(f)
Traceback (most recent call last):
flow = cloudpickle.loads(decrypted_pickle)
File "/usr/local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 944, in subimport
__import__(name)
ModuleNotFoundError: No module named 'praw'