Как перебирать большой список без блокировки цикла событий

У меня есть скрипт python с запущенным циклом событий asyncio, я хочу знать, как перебирать большой список, не блокируя цикл событий. Таким образом, поддерживая цикл в рабочем состоянии.

Я пробовал создать собственный класс с __aiter__ и __anext__, который не работал, я также пробовал создать async function, который дает результат, но по-прежнему блокирует.

В настоящее время:

for index, item in enumerate(list_with_thousands_of_items):
    # do something

Пользовательский класс, который я пробовал:

class Aiter:
    def __init__(self, iterable):
        self.iter_ = iter(iterable)

    async def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            object = next(self.iter_)
        except StopIteration:
            raise StopAsyncIteration
        return object

Но это всегда приводит к

TypeError: 'async for' received an object from __aiter__ that does not implement __anext__: coroutine

Я сделал async function, который работает, но все еще блокирует цикл событий:

async def async_enumerate(iterable, start:int=0):
    for idx, i in enumerate(iterable, start):
        yield idx, i

person Dylee    schedule 23.04.2019    source источник
comment
Что вы делаете с каждым элементом в списке?   -  person rdas    schedule 23.04.2019
comment
Вы рассматривали что-то вроде ProcessPoolExecutor? Возможно, это подойдет для вашей проблемы.   -  person Rafaël Dera    schedule 23.04.2019
comment
@rdas я добавляю каждый элемент в строку   -  person Dylee    schedule 23.04.2019
comment
@ RafaëlDera, к сожалению, я никогда не слышал о ProcessPoolExecutor   -  person Dylee    schedule 23.04.2019
comment
Ваш __aiter__ должен быть def, а не async def.   -  person user4815162342    schedule 23.04.2019
comment
После исправления @ user4815162342 вы можете попробовать добавить await asyncio.sleep (0) в свой anext; каждое взаимодействие или каждые 10 итераций или что-то еще.   -  person Max    schedule 23.04.2019


Ответы (2)


Как указал @deceze, вы можете использовать await asyncio.sleep(0) для явной передачи управления циклу событий. Однако у этого подхода есть проблемы.

Предположительно список довольно большой, поэтому вам потребовались специальные меры, чтобы разблокировать цикл событий. Но если список такой большой, принуждение каждой итерации цикла к переходу в цикл событий значительно замедлит его. Конечно, вы можете облегчить это, добавив счетчик и ожидая только когда i%10 == 0 или когда i%100 == 0 и т. Д. Но тогда вы должны принимать произвольные решения (догадываться) относительно того, как часто отказываться от управления. Если вы слишком часто уступаете, вы замедляете свою работу. Если вы слишком редко уступаете, значит, цикл событий не отвечает.

Этого можно избежать, используя run_in_executor, как предложил РафаэльДера. run_in_executor принимает функцию блокировки и передает ее выполнение пулу потоков. Он немедленно возвращает будущее, которое может быть awaited в asyncio, и результат которого, когда он станет доступным, будет возвращаемым значением функции блокировки. (Если возникает функция блокировки, вместо этого будет распространено исключение.) Такой await приостановит сопрограмму до тех пор, пока функция не вернется или не вызовет в своем потоке, позволяя циклу событий оставаться полностью функциональным в это время. Поскольку функция блокировки и цикл обработки событий выполняются в отдельных потоках, функции не нужно ничего делать, чтобы разрешить запуск события - они работают независимо. Здесь даже GIL не является проблемой, потому что GIL обеспечивает передачу управления между потоками.

С run_in_executor ваш код может выглядеть так:

def process_the_list():
    for index, item in enumerate(list_with_thousands_of_items):
        # do something

loop = asyncio.get_event_loop()
await loop.run_in_executor(None, process_the_list)
person user4815162342    schedule 23.04.2019

asyncio - это совместная многозадачность. Кооперативная часть проистекает из того факта, что ваша функция должна отдавать выполнение обратно в цикл событий, чтобы разрешить выполнение других вещей. Если вы что-то не await (или не завершите свою функцию), вы затягиваете цикл событий.

Вы можете просто await какое-нибудь событие noop, вероятно, наиболее подходящим является await asyncio.sleep(0). Это гарантирует, что ваша задача будет возобновлена ​​как можно скорее, но позволит запланировать и другие задачи.

person deceze♦    schedule 23.04.2019