Created
May 18, 2019 15:46
-
-
Save forkloop/2fc1578ea603cb8b23e23807f1b1a7f2 to your computer and use it in GitHub Desktop.
grpc python server interceptor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # https://github.com/mehrdada/grpc/blob/aa477becd1a7c44f8150ad24539cf6d40af24b37/examples/python/interceptors/service-latency-interceptor/service_latency_interceptor.py | |
| import logging | |
| import threading | |
| import grpc | |
| def _wrap_rpc_behavior(handler, fn): | |
| if handler is None: | |
| return None | |
| if handler.request_streaming and handler.response_streaming: | |
| behavior_fn = handler.stream_stream | |
| handler_factory = grpc.stream_stream_rpc_method_handler | |
| elif handler.request_streaming and not handler.response_streaming: | |
| behavior_fn = handler.stream_unary | |
| handler_factory = grpc.stream_unary_rpc_method_handler | |
| elif not handler.request_streaming and handler.response_streaming: | |
| behavior_fn = handler.unary_stream | |
| handler_factory = grpc.unary_stream_rpc_method_handler | |
| else: | |
| behavior_fn = handler.unary_unary | |
| handler_factory = grpc.unary_unary_rpc_method_handler | |
| return handler_factory(fn(behavior_fn, | |
| handler.request_streaming, | |
| handler.response_streaming), | |
| request_deserializer=handler.request_deserializer, | |
| response_serializer=handler.response_serializer) | |
| class UtilizationInterceptor(grpc.ServerInterceptor): | |
| def __init__(self, max_active_requests): | |
| self.active_requests = 0 | |
| self.lock = threading.Lock() | |
| self.max_active_requests = max_active_requests | |
| def intercept_service(self, continuation, handler_call_details): | |
| def wrapper(behavior, request_streaming, response_streaming): | |
| def new_behavior(request_or_iterator, servicer_context): | |
| try: | |
| with self.lock: | |
| self.active_requests += 1 | |
| logging.warning('utilization %s', float(self.active_requests) / self.max_active_requests) | |
| if response_streaming: | |
| for res in behavior(request_or_iterator, servicer_context): | |
| yield res | |
| else: | |
| return behavior(request_or_iterator, servicer_context) | |
| finally: | |
| with self.lock: | |
| self.active_requests -= 1 | |
| logging.warning('utilization %s', float(self.active_requests) / self.max_active_requests) | |
| return new_behavior | |
| return _wrap_rpc_behavior(continuation(handler_call_details), wrapper) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment