Перейти к основному содержимому

Создание синхронной функции из асинхронной

Иногда может возникнуть необходимость вызвать асинхронную функцию внутри синхронной. Сделать это достаточно просто с помощью asyncio.run.

import asyncio


def syncify(func):
def wrapper(*args, **kwargs):
res = func(*args, **kwargs)
if asyncio.iscoroutine(res):
return asyncio.run(res)
return res

return wrapper

Пример использования:

import asyncio


async def async_sleep(seconds):
print('Sleeping for', seconds, 'sec')
await asyncio.sleep(seconds)
print('Awake after', seconds, 'sec')
return seconds


sleep = syncify(async_sleep)

if __name__ == '__main__':
print(sleep(1))
print(sleep(2))
print(sleep(3))
Sleeping for 1 sec
Awake after 1 sec
1
Sleeping for 2 sec
Awake after 2 sec
2
Sleeping for 3 sec
Awake after 3 sec
3

Связанные заметки