############################################################################## # # Copyright (c) 2006 Zope Foundation and Contributors. # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # ############################################################################## from zope.interface.adapter import AdapterLookup from zope.interface.adapter import AdapterRegistry from zope.interface.registry import Components from zope.interface import providedBy from zope.interface.interface import adapter_hooks import asyncio _BLANK = u"" class AsyncAdapterLookup(AdapterLookup): async def subscribers(self, objects, provided): subscriptions = self.subscriptions(map(providedBy, objects), provided) results = [] for subscription in sorted( subscriptions, key=lambda sub: getattr(sub, "priority", 100) ): if asyncio.iscoroutinefunction(subscription): results.append(await subscription(*objects)) else: results.append(subscription(*objects)) return results class AsyncAdapterRegistry(AdapterRegistry): """ Customized adapter registry for async Taken from https://github.com/plone/guillotina """ _delegated = AdapterRegistry._delegated + ("subscribers",) # type: ignore LookupClass = AsyncAdapterLookup def __init__(self, parent, name): self.__parent__ = parent self.__name__ = name super().__init__() class GlobalComponents(Components): def _init_registries(self): self.adapters = AsyncAdapterRegistry(self, "adapters") self.utilities = AsyncAdapterRegistry(self, "utilities") def __reduce__(self): # Global site managers are pickled as global objects return self.__name__ # this is adapted from pyramid registry async def notify(self, *event): await self.adapters.subscribers(event, None) # we only have one global registry, and we instantiate it base = GlobalComponents("base") # this part makes Interfaces work as lookups, # IAdapter(obj) def adapter_hook(interface, object, name='', default=None, args=[], kwargs={}): global base registry = base factory = registry.adapters.lookup((providedBy(object),), interface, name) if factory is None: return default return factory(object, *args, **kwargs) adapter_hooks.append(adapter_hook) async def notify(event): await base.notify(event) def provideUtility(component, provides=None, name=_BLANK): base.registerUtility(component, provides, name, event=False) def provideAdapter(factory, adapts=None, provides=None, name=_BLANK): base.registerAdapter(factory, adapts, provides, name, event=False) def provideSubscriptionAdapter(factory, adapts=None, provides=None): base.registerSubscriptionAdapter(factory, adapts, provides, event=False) def provideHandler(factory, adapts=None): base.registerHandler(factory, adapts, event=False) # proxy methods to our registry instance getUtility = base.getUtility getAdapter = base.getAdapter getAdapters = base.getAdapters getAllUtilitiesRegisteredFor = base.getAllUtilitiesRegisteredFor getUtilitiesFor = base.getUtilitiesFor getMultiAdapter = base.getMultiAdapter registerAdapter = base.registerAdapter handle = base.handle queryAdapter = base.queryAdapter queryMultiAdapter = base.queryMultiAdapter queryUtility = base.queryUtility registerAdapter = base.registerAdapter registerHandler = base.registerHandler registerUtility = base.registerUtility registerSubscriptionAdapter = base.registerSubscriptionAdapter registeredAdapters = base. registeredAdapters registeredUtilities = base.registeredUtilities subscribers = base.subscribers unregisterAdapter = base.unregisterAdapter unregisterHandler = base.unregisterHandler unregisterSubscriptionAdapter = base.unregisterSubscriptionAdapter unregisterUtility = base.unregisterUtility