import requestsfrom bs4 import BeautifulSoupfrom prefect import task, Flow, Parameter@task(tags=["web"])defretrieve_url(url):""" Given a URL (string), retrieves html and returns the html as a string. """ html = requests.get(url)if html.ok:return html.textelse:raiseValueError("{} could not be retrieved.".format(url))
@taskdefscrape_dialogue(episode_html):""" Given a string of html representing an episode page, returns a tuple of (title, [(character, text)]) of the dialogue from that episode """ episode =BeautifulSoup(episode_html, 'html.parser') title = episode.title.text.rstrip(' *').replace("'", "''") convos = episode.find_all('b')or episode.find_all('span', {'class': 'char'}) dialogue = []for item in convos: who = item.text.rstrip(': ').rstrip(' *').replace("'", "''") what =str(item.next_sibling).rstrip(' *').replace("'", "''") dialogue.append((who, what))return (title, dialogue)
episode_url ="http://www.insidethex.co.uk/transcrp/scrp320.htm"outer_space = flow.run(parameters={"url": episode_url})state = outer_space.result[dialogue]# the `State` object for the dialogue taskfirst_five_spoken_lines = state.result[1][:5] # state.result is a tuple (episode_name, [dialogue])print(''.join([f'{speaker}: {words}'for speaker, words in first_five_spoken_lines]))
ROKYCRIKENSON:Yeah,thisisRoky.Icheckedalltheconnections.Idon''tknowwhyallthepower''sdownouthere.I''mgoingtohavetocomein and get some more equipment. Yeah, yeah... yeah, I''ll need several ofthose.Allright...HAROLDLAMB:Um...Idon''twanttoscareyou,but...IthinkI''mmadlyin love with you.CHRISSYGIORGIO:Harold,Ilikeyoualottoo,butthisisourfirstdate.Imean,Ithinkthatweneedmoretimetogettoknowoneanother.HAROLDLAMB:OhmyGod...ohmyGod...CHRISSYGIORGIO:Harold,whatarethosethings?
Now that we're reasonably confident in our scraping logic, we want to reproduce the above example for every episode while maintaining backwards compatibility for a single page. To do so, we need to compile a list of the URLs for every episode, and then proceed to scrape each one.
@taskdefcreate_episode_list(base_url,main_html,bypass):""" Given the main page html, creates a list of episode URLs """if bypass:return [base_url] main_page =BeautifulSoup(main_html, 'html.parser') episodes = []for link in main_page.find_all('a'): url = link.get('href')if'transcrp/scrp'in (url or''): episodes.append(base_url + url)return episodes
@taskdefcreate_episode_list(base_url,main_html,bypass):""" Given the main page html, creates a list of episode URLs """if bypass:return [base_url] main_page =BeautifulSoup(main_html, 'html.parser') episodes = []for link in main_page.find_all('a'): url = link.get('href')if'transcrp/scrp'in (url or''): episodes.append(base_url + url)return episodes
In our current situation, instead of a loop, we utilize the map() method of tasks. At a high level, at runtime task.map(iterable_task) is roughly equivalent to:
%%timescraped_state = flow.run(parameters={"url": "http://www.insidethex.co.uk/"})# CPU times: user 7.48 s, sys: 241 ms, total: 7.73 s# Wall time: 4min 46sdialogue_state = scraped_state.result[dialogue]# list of State objectsprint('\n'.join([f'{s.result[0]}: {s}'for s in dialogue_state.map_states[:5]]))
BABYLON-1AYW04:Success("Task run succeeded.")Pilot-1X79:Success("Task run succeeded.")DeepThroat-1X01:Success("Task run succeeded.")Squeeze-1X02:Success("Task run succeeded.")Conduit-1X03:Success("Task run succeeded.")
from prefect.engine.executors import DaskExecutorexecutor =DaskExecutor(local_processes=True)%%timescraped_state = flow.run(parameters={"url": "http://www.insidethex.co.uk/"}, executor=executor)# CPU times: user 9.7 s, sys: 1.67 s, total: 11.4 s# Wall time: 1min 34sdialogue_state = scraped_state.result[dialogue]# list of State objectsprint('\n'.join([f'{s.result[0]}: {s}'for s in dialogue_state.map_states[:5]]))
BABYLON-1AYW04:Success("Task run succeeded.")Pilot-1X79:Success("Task run succeeded.")DeepThroat-1X01:Success("Task run succeeded.")Squeeze-1X02:Success("Task run succeeded.")Conduit-1X03:Success("Task run succeeded.")
from prefect import unmappedwith flow: db =create_db() ep_script = create_episode_script.map(episode=dialogue) final = insert_episode.map(ep_script, upstream_tasks=[unmapped(db)])
final = flow.run(parameters={"url": "http://www.insidethex.co.uk/transcrp/tlg105.htm","bypass": True})
然后返回sqlite3 shell来查找已更新的内容:
sqlite> .openxfiles_db.sqlitesqlite> SELECT*FROMXFILESWHERETEXTLIKE'%programming%';KillSwitch-5X11|BYERS|ThisCDhassomekindofenhancedbackgrounddata.Lotsofcode.Maybeaprogrammingdesign.FirstPersonShooter-7X13|BYERS|Langlydidsomeprogrammingforthem.Hecreatedallofthebadguys.TheSpringfieldFiles-X3601|HOMER|Nowson,theydoalotofqualityprogrammingtoo.Haahaahaa!Ikillme.PlanetoftheFrohikes-1AEB05|FROHIKE|It's a pretty neat bit of programming.