Skip to content

Instantly share code, notes, and snippets.

@forkloop
Created May 18, 2019 15:46
Show Gist options
  • Select an option

  • Save forkloop/2fc1578ea603cb8b23e23807f1b1a7f2 to your computer and use it in GitHub Desktop.

Select an option

Save forkloop/2fc1578ea603cb8b23e23807f1b1a7f2 to your computer and use it in GitHub Desktop.
grpc python server interceptor
# https://github.com/mehrdada/grpc/blob/aa477becd1a7c44f8150ad24539cf6d40af24b37/examples/python/interceptors/service-latency-interceptor/service_latency_interceptor.py
import logging
import threading
import grpc
def _wrap_rpc_behavior(handler, fn):
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
behavior_fn = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
behavior_fn = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
behavior_fn = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
behavior_fn = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(fn(behavior_fn,
handler.request_streaming,
handler.response_streaming),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer)
class UtilizationInterceptor(grpc.ServerInterceptor):
def __init__(self, max_active_requests):
self.active_requests = 0
self.lock = threading.Lock()
self.max_active_requests = max_active_requests
def intercept_service(self, continuation, handler_call_details):
def wrapper(behavior, request_streaming, response_streaming):
def new_behavior(request_or_iterator, servicer_context):
try:
with self.lock:
self.active_requests += 1
logging.warning('utilization %s', float(self.active_requests) / self.max_active_requests)
if response_streaming:
for res in behavior(request_or_iterator, servicer_context):
yield res
else:
return behavior(request_or_iterator, servicer_context)
finally:
with self.lock:
self.active_requests -= 1
logging.warning('utilization %s', float(self.active_requests) / self.max_active_requests)
return new_behavior
return _wrap_rpc_behavior(continuation(handler_call_details), wrapper)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment