Skip to content

Instantly share code, notes, and snippets.

@haizaar
Last active September 16, 2022 21:09
Show Gist options
  • Select an option

  • Save haizaar/63b941ec747f71d076494847fef49317 to your computer and use it in GitHub Desktop.

Select an option

Save haizaar/63b941ec747f71d076494847fef49317 to your computer and use it in GitHub Desktop.

Revisions

  1. haizaar revised this gist Nov 24, 2020. 1 changed file with 4 additions and 1 deletion.
    5 changes: 4 additions & 1 deletion google-api-utils.py
    Original file line number Diff line number Diff line change
    @@ -78,6 +78,9 @@ def _provision_http(self) -> AuthorizedHttp:
    logger.info("Transport pool exhausted. Creating new transport")
    return self.factory()

    def __del__(self) -> None:
    def close(self) -> None:
    for ahttp in self.pool:
    ahttp.http.close()

    def __del__(self) -> None:
    self.close()
  2. haizaar created this gist Nov 24, 2020.
    83 changes: 83 additions & 0 deletions google-api-utils.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,83 @@
    from __future__ import annotations

    from dataclasses import dataclass, field
    from typing import Any, Callable, Hashable, List, Optional

    import google.auth
    import httplib2
    import structlog
    from google_auth_httplib2 import AuthorizedHttp
    from googleapiclient.http import HttpRequest

    logger = structlog.get_logger(__name__)


    @dataclass
    class MemCache:
    data: dict[Hashable, Any] = field(default_factory=dict)

    def get(self, key: Hashable) -> Any:
    if hit := self.data.get(key, None):
    logger.debug("Cache hit", key=key)
    return hit

    def set(self, key: Hashable, data: Any) -> None:
    self.data[key] = data

    def delete(self, key):
    try:
    del self.data[1]
    except KeyError:
    pass


    @dataclass
    class APIConnector:
    """
    This class is a thread-safe wrapper around HttpRequest.execute() method.
    It uses a pool of AuthorizedHttp objects and makes sure there is
    only one in-flight request for each.
    """

    factory: Callable[[], AuthorizedHttp]
    pool: List[AuthorizedHttp] = field(default_factory=[])

    @classmethod
    def new(
    cls,
    credentials: google.auth.Credentials,
    initial_size: int = 5,
    timeout_seconds: int = 3,
    cache: Optional[MemCache] = None,
    ) -> APIConnector:

    factory = lambda: AuthorizedHttp(
    credentials, http=httplib2.Http(timeout=timeout_seconds, cache=cache)
    )

    pool: List[AuthorizedHttp] = []
    for i in range(initial_size):
    pool.append(factory())

    return cls(factory, pool=pool)

    def execute(self, request: HttpRequest) -> Any:
    http: Optional[AuthorizedHttp] = None
    try:
    http = self._provision_http()
    return request.execute(http=http)
    finally:
    if http:
    self.pool.append(http)

    def _provision_http(self) -> AuthorizedHttp:
    # This function can run in parallel in multiple threads.
    try:
    return self.pool.pop()
    except IndexError:
    logger.info("Transport pool exhausted. Creating new transport")
    return self.factory()

    def __del__(self) -> None:
    for ahttp in self.pool:
    ahttp.http.close()