function.runtime
Utilities to create a composition function runtime.
1# Copyright 2023 The Crossplane Authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Utilities to create a composition function runtime.""" 16 17import asyncio 18import os 19import signal 20from collections.abc import Sequence 21from typing import Any 22 23import grpc 24from grpc_reflection.v1alpha import reflection 25 26import crossplane.function.proto.v1.run_function_pb2 as fnv1 27import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1 28import crossplane.function.proto.v1beta1.run_function_pb2 as fnv1beta1 29import crossplane.function.proto.v1beta1.run_function_pb2_grpc as grpcv1beta1 30 31SERVICE_NAMES = ( 32 reflection.SERVICE_NAME, 33 fnv1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name, 34 fnv1beta1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name, 35) 36 37SHUTDOWN_GRACE_PERIOD_SECONDS = 5 38 39 40def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials: 41 """Load TLS credentials for a composition function gRPC server. 42 43 Args: 44 tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt. 45 46 Returns: 47 gRPC mTLS server credentials. 48 49 tls.crt and tls.key must be the function's PEM-encoded certificate and 50 private key. ca.cert must be a PEM-encoded CA certificate used to 51 authenticate callers (i.e. Crossplane). 52 """ 53 if tls_certs_dir is None: 54 return None 55 56 with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f: 57 crt = f.read() 58 59 with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f: 60 key = f.read() 61 62 with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f: 63 ca = f.read() 64 65 return grpc.ssl_server_credentials( 66 private_key_certificate_chain_pairs=[(key, crt)], 67 root_certificates=ca, 68 require_client_auth=True, 69 ) 70 71 72def serve( 73 function: grpcv1.FunctionRunnerService, 74 address: str, 75 *, 76 creds: grpc.ServerCredentials, 77 insecure: bool, 78 options: Sequence[tuple[str, Any]] | None = None, 79) -> None: 80 """Start a gRPC server and serve requests asychronously. 81 82 Args: 83 function: The function (class) to use to serve requests. 84 address: The address at which to listen for requests. 85 creds: The credentials used to authenticate requests. 86 insecure: Serve insecurely, without credentials or encryption. 87 options: Additional gRPC server options. Eg. set max receive message 88 size to 5Mb (default is 4Mb): 89 [("grpc.max_receive_message_length", 1024 * 1024 * 5)] 90 91 Raises: 92 ValueError if creds is None and insecure is False. 93 94 If insecure is true requests will be served insecurely, even if credentials 95 are supplied. 96 """ 97 # Define the loop before the server so everything uses the same loop. 98 loop = asyncio.get_event_loop() 99 100 server = grpc.aio.server(options=options) 101 102 loop.add_signal_handler( 103 signal.SIGTERM, 104 lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)), 105 ) 106 107 grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server) 108 grpcv1beta1.add_FunctionRunnerServiceServicer_to_server( 109 BetaFunctionRunner(wrapped=function), server 110 ) 111 reflection.enable_server_reflection(SERVICE_NAMES, server) 112 113 if creds is None and insecure is False: 114 msg = ( 115 "no credentials were provided - did you provide credentials or use " 116 "the insecure flag?" 117 ) 118 raise ValueError(msg) 119 120 if creds is not None: 121 server.add_secure_port(address, creds) 122 123 if insecure: 124 server.add_insecure_port(address) 125 126 async def start(): 127 await server.start() 128 await server.wait_for_termination() 129 130 try: 131 loop.run_until_complete(start()) 132 finally: 133 loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)) 134 loop.close() 135 136 137class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService): 138 """A BetaFunctionRunner handles beta gRPC RunFunctionRequests. 139 140 It handles requests by passing them to a wrapped v1.FunctionRunnerService. 141 Incoming v1beta1 requests are converted to v1 by round-tripping them through 142 serialization. Outgoing requests are converted from v1 to v1beta1 the same 143 way. 144 """ 145 146 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 147 """Create a new BetaFunctionRunner.""" 148 self.wrapped = wrapped 149 150 async def RunFunction( # noqa: N802 # gRPC requires this name. 151 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 152 ) -> fnv1beta1.RunFunctionResponse: 153 """Run the underlying function.""" 154 gareq = fnv1.RunFunctionRequest() 155 gareq.ParseFromString(req.SerializeToString()) 156 157 garsp = await self.wrapped.RunFunction(gareq, context) 158 rsp = fnv1beta1.RunFunctionResponse() 159 rsp.ParseFromString(garsp.SerializeToString()) 160 161 return rsp
41def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials: 42 """Load TLS credentials for a composition function gRPC server. 43 44 Args: 45 tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt. 46 47 Returns: 48 gRPC mTLS server credentials. 49 50 tls.crt and tls.key must be the function's PEM-encoded certificate and 51 private key. ca.cert must be a PEM-encoded CA certificate used to 52 authenticate callers (i.e. Crossplane). 53 """ 54 if tls_certs_dir is None: 55 return None 56 57 with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f: 58 crt = f.read() 59 60 with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f: 61 key = f.read() 62 63 with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f: 64 ca = f.read() 65 66 return grpc.ssl_server_credentials( 67 private_key_certificate_chain_pairs=[(key, crt)], 68 root_certificates=ca, 69 require_client_auth=True, 70 )
Load TLS credentials for a composition function gRPC server.
Arguments:
- tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
Returns:
gRPC mTLS server credentials.
tls.crt and tls.key must be the function's PEM-encoded certificate and private key. ca.cert must be a PEM-encoded CA certificate used to authenticate callers (i.e. Crossplane).
73def serve( 74 function: grpcv1.FunctionRunnerService, 75 address: str, 76 *, 77 creds: grpc.ServerCredentials, 78 insecure: bool, 79 options: Sequence[tuple[str, Any]] | None = None, 80) -> None: 81 """Start a gRPC server and serve requests asychronously. 82 83 Args: 84 function: The function (class) to use to serve requests. 85 address: The address at which to listen for requests. 86 creds: The credentials used to authenticate requests. 87 insecure: Serve insecurely, without credentials or encryption. 88 options: Additional gRPC server options. Eg. set max receive message 89 size to 5Mb (default is 4Mb): 90 [("grpc.max_receive_message_length", 1024 * 1024 * 5)] 91 92 Raises: 93 ValueError if creds is None and insecure is False. 94 95 If insecure is true requests will be served insecurely, even if credentials 96 are supplied. 97 """ 98 # Define the loop before the server so everything uses the same loop. 99 loop = asyncio.get_event_loop() 100 101 server = grpc.aio.server(options=options) 102 103 loop.add_signal_handler( 104 signal.SIGTERM, 105 lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)), 106 ) 107 108 grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server) 109 grpcv1beta1.add_FunctionRunnerServiceServicer_to_server( 110 BetaFunctionRunner(wrapped=function), server 111 ) 112 reflection.enable_server_reflection(SERVICE_NAMES, server) 113 114 if creds is None and insecure is False: 115 msg = ( 116 "no credentials were provided - did you provide credentials or use " 117 "the insecure flag?" 118 ) 119 raise ValueError(msg) 120 121 if creds is not None: 122 server.add_secure_port(address, creds) 123 124 if insecure: 125 server.add_insecure_port(address) 126 127 async def start(): 128 await server.start() 129 await server.wait_for_termination() 130 131 try: 132 loop.run_until_complete(start()) 133 finally: 134 loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)) 135 loop.close()
Start a gRPC server and serve requests asychronously.
Arguments:
- function: The function (class) to use to serve requests.
- address: The address at which to listen for requests.
- creds: The credentials used to authenticate requests.
- insecure: Serve insecurely, without credentials or encryption.
- options: Additional gRPC server options. Eg. set max receive message size to 5Mb (default is 4Mb): [("grpc.max_receive_message_length", 1024 * 1024 * 5)]
Raises:
- ValueError if creds is None and insecure is False.
If insecure is true requests will be served insecurely, even if credentials are supplied.
138class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService): 139 """A BetaFunctionRunner handles beta gRPC RunFunctionRequests. 140 141 It handles requests by passing them to a wrapped v1.FunctionRunnerService. 142 Incoming v1beta1 requests are converted to v1 by round-tripping them through 143 serialization. Outgoing requests are converted from v1 to v1beta1 the same 144 way. 145 """ 146 147 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 148 """Create a new BetaFunctionRunner.""" 149 self.wrapped = wrapped 150 151 async def RunFunction( # noqa: N802 # gRPC requires this name. 152 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 153 ) -> fnv1beta1.RunFunctionResponse: 154 """Run the underlying function.""" 155 gareq = fnv1.RunFunctionRequest() 156 gareq.ParseFromString(req.SerializeToString()) 157 158 garsp = await self.wrapped.RunFunction(gareq, context) 159 rsp = fnv1beta1.RunFunctionResponse() 160 rsp.ParseFromString(garsp.SerializeToString()) 161 162 return rsp
A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
It handles requests by passing them to a wrapped v1.FunctionRunnerService. Incoming v1beta1 requests are converted to v1 by round-tripping them through serialization. Outgoing requests are converted from v1 to v1beta1 the same way.
147 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 148 """Create a new BetaFunctionRunner.""" 149 self.wrapped = wrapped
Create a new BetaFunctionRunner.
151 async def RunFunction( # noqa: N802 # gRPC requires this name. 152 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 153 ) -> fnv1beta1.RunFunctionResponse: 154 """Run the underlying function.""" 155 gareq = fnv1.RunFunctionRequest() 156 gareq.ParseFromString(req.SerializeToString()) 157 158 garsp = await self.wrapped.RunFunction(gareq, context) 159 rsp = fnv1beta1.RunFunctionResponse() 160 rsp.ParseFromString(garsp.SerializeToString()) 161 162 return rsp
Run the underlying function.