Skip to content Skip to sidebar Skip to footer

Run Code After Asyncio Run_until_complete() Statement Has Finished

I am fairly new to asyncio and I managed to do some requests with it. I made a function fetch_all() that takes in a list of the queries (URLs) and the loop previously created with

Solution 1:

I would do this in different way.

I would run loop inside fetch() with try/except to catch exception and repeate it.

Because some problems can never give result so while-loop may run forever - so I would rather use for _ in range(3) to try it only three times.

I would also return url from fetch so it would be easer to get urls which don't give result.

import aiohttp
import asyncio
import ssl

asyncdeffetch(session, url):
    exception = Nonefor number inrange(3):  # try only 3 timestry:
            asyncwith session.get(url, ssl=ssl.SSLContext()) as response:
                data = await response.json()
                #print('data:', data)return url, data
        except Exception as ex:
            print('[ERROR] {} | {} | {}'.format(url, number+1, ex))
            exception = ex
            
    return url, exception

asyncdeffetch_all(urls, loop):
    asyncwith aiohttp.ClientSession(loop=loop) as session:
        returnawait asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)


queries = [
    'https://httpbin.org/get',
    'https://toscrape.com',
    'https://fake.domain/'
]

if __name__ == '__main__':
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(fetch_all(queries, loop))

    #print(results)print('--- results ---')
    
    for url, result in results:
        print('url:', url)
        print('result:', result)
        print('is dict:', isinstance(result, dict))
        print('type:', type(result))
        print('---')

Result:

[ERROR] https://fake.domain/|1|Cannotconnecttohostfake.domain:443ssl:<ssl.SSLContextobjectat0x7f3902afc2c0> [Nameorservicenotknown]
[ERROR] https://fake.domain/|2|Cannotconnecttohostfake.domain:443ssl:<ssl.SSLContextobjectat0x7f3902afc440> [Nameorservicenotknown]
[ERROR] https://fake.domain/|3|Cannotconnecttohostfake.domain:443ssl:<ssl.SSLContextobjectat0x7f3902afc9c0> [Nameorservicenotknown]
[ERROR] https://toscrape.com|1|0,message='Attemptto decode JSON with unexpected mimetype:text/html',url=URL('https://toscrape.com')
[ERROR] https://toscrape.com|2|0,message='Attemptto decode JSON with unexpected mimetype:text/html',url=URL('https://toscrape.com')
[ERROR] https://toscrape.com|3|0,message='Attemptto decode JSON with unexpected mimetype:text/html',url=URL('https://toscrape.com')---results---url:https://httpbin.org/getresult: {'args': {}, 'headers': {'Accept':'*/*', 'Accept-Encoding':'gzip, deflate', 'Host':'httpbin.org', 'User-Agent':'Python/3.8 aiohttp/3.7.4.post0', 'X-Amzn-Trace-Id':'Root=1-60e5c00e-45aae85e78277e5122b262c9'}, 'origin':'83.11.175.159', 'url':'https://httpbin.org/get'}
is dict:Truetype:<class'dict'>---url:https://toscrape.comresult:0,message='Attemptto decode JSON with unexpected mimetype:text/html',url=URL('https://toscrape.com')is dict:Falsetype:<class'aiohttp.client_exceptions.ContentTypeError'>---url:https://fake.domain/result:Cannotconnecttohostfake.domain:443ssl:<ssl.SSLContextobjectat0x7f3902afc9c0> [Nameorservicenotknown]
is dict:Falsetype:<class'aiohttp.client_exceptions.ClientConnectorError'>---

EDIT:

Version which uses your method with looping run_until_complete but I would do all in one for-loop.

And I would use for _ in range(3) to repeate it only three times.

This works but previous version seems much simpler.

import aiohttp
import asyncio
import ssl

asyncdeffetch(session, url):
    asyncwith session.get(url, ssl=ssl.SSLContext()) as response:
        returnawait response.json()

asyncdeffetch_all(urls, loop):
    asyncwith aiohttp.ClientSession(loop=loop) as session:
        returnawait asyncio.gather(*[fetch(session, url) for url in urls], return_exceptions=True)

queries = [
    'https://httpbin.org/get',
    'https://httpbin.org/json',
    'https://toscrape.com',
    'https://fake.domain/'
]

if __name__ == '__main__':

    # you can get it once
    loop = asyncio.get_event_loop()

    # original all queries
    all_queries = queries
    # places for all results  
    all_results = [None] * len(all_queries)
    
    # selected indexes at start
    indexes = list(range(len(all_queries)))
        
    for number inrange(3):
        # selected queries
        queries = [all_queries[idx] for idx in indexes]
        
        # selected results
        results = loop.run_until_complete(fetch_all(queries, loop))
        
        print('\n--- try:', number+1, '--- results:', len(results), '---\n')
        
        new_indexes = []
        
        for idx, url, result inzip(indexes, queries, results):
            all_results[idx] = result
            ifnotisinstance(result, dict):
                new_indexes.append(idx)

            print('url:', url)
            print('result:', result)    
            print('is dict:', isinstance(result, dict))
            print('type:', type(result))
            print('---')
                
        # selected indexes after fitering correct results
        indexes = new_indexes             

Post a Comment for "Run Code After Asyncio Run_until_complete() Statement Has Finished"