function.runtime
Utilities to create a composition function runtime.
1# Copyright 2023 The Crossplane Authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Utilities to create a composition function runtime.""" 16 17import asyncio 18import os 19import signal 20 21import grpc 22from grpc_reflection.v1alpha import reflection 23 24import crossplane.function.proto.v1.run_function_pb2 as fnv1 25import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1 26import crossplane.function.proto.v1beta1.run_function_pb2 as fnv1beta1 27import crossplane.function.proto.v1beta1.run_function_pb2_grpc as grpcv1beta1 28 29SERVICE_NAMES = ( 30 reflection.SERVICE_NAME, 31 fnv1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name, 32 fnv1beta1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name, 33) 34 35SHUTDOWN_GRACE_PERIOD_SECONDS = 5 36 37 38def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials: 39 """Load TLS credentials for a composition function gRPC server. 40 41 Args: 42 tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt. 43 44 Returns: 45 gRPC mTLS server credentials. 46 47 tls.crt and tls.key must be the function's PEM-encoded certificate and 48 private key. ca.cert must be a PEM-encoded CA certificate used to 49 authenticate callers (i.e. Crossplane). 50 """ 51 if tls_certs_dir is None: 52 return None 53 54 with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f: 55 crt = f.read() 56 57 with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f: 58 key = f.read() 59 60 with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f: 61 ca = f.read() 62 63 return grpc.ssl_server_credentials( 64 private_key_certificate_chain_pairs=[(key, crt)], 65 root_certificates=ca, 66 require_client_auth=True, 67 ) 68 69 70def serve( 71 function: grpcv1.FunctionRunnerService, 72 address: str, 73 *, 74 creds: grpc.ServerCredentials, 75 insecure: bool, 76) -> None: 77 """Start a gRPC server and serve requests asychronously. 78 79 Args: 80 function: The function (class) to use to serve requests. 81 address: The address at which to listen for requests. 82 creds: The credentials used to authenticate requests. 83 insecure: Serve insecurely, without credentials or encryption. 84 85 Raises: 86 ValueError if creds is None and insecure is False. 87 88 If insecure is true requests will be served insecurely, even if credentials 89 are supplied. 90 """ 91 # Define the loop before the server so everything uses the same loop. 92 loop = asyncio.get_event_loop() 93 94 server = grpc.aio.server() 95 96 loop.add_signal_handler( 97 signal.SIGTERM, 98 lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)), 99 ) 100 101 grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server) 102 grpcv1beta1.add_FunctionRunnerServiceServicer_to_server( 103 BetaFunctionRunner(wrapped=function), server 104 ) 105 reflection.enable_server_reflection(SERVICE_NAMES, server) 106 107 if creds is None and insecure is False: 108 msg = ( 109 "no credentials were provided - did you provide credentials or use " 110 "the insecure flag?" 111 ) 112 raise ValueError(msg) 113 114 if creds is not None: 115 server.add_secure_port(address, creds) 116 117 if insecure: 118 server.add_insecure_port(address) 119 120 async def start(): 121 await server.start() 122 await server.wait_for_termination() 123 124 try: 125 loop.run_until_complete(start()) 126 finally: 127 loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)) 128 loop.close() 129 130 131class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService): 132 """A BetaFunctionRunner handles beta gRPC RunFunctionRequests. 133 134 It handles requests by passing them to a wrapped v1.FunctionRunnerService. 135 Incoming v1beta1 requests are converted to v1 by round-tripping them through 136 serialization. Outgoing requests are converted from v1 to v1beta1 the same 137 way. 138 """ 139 140 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 141 """Create a new BetaFunctionRunner.""" 142 self.wrapped = wrapped 143 144 async def RunFunction( # noqa: N802 # gRPC requires this name. 145 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 146 ) -> fnv1beta1.RunFunctionResponse: 147 """Run the underlying function.""" 148 gareq = fnv1.RunFunctionRequest() 149 gareq.ParseFromString(req.SerializeToString()) 150 151 garsp = await self.wrapped.RunFunction(gareq, context) 152 rsp = fnv1beta1.RunFunctionResponse() 153 rsp.ParseFromString(garsp.SerializeToString()) 154 155 return rsp
39def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials: 40 """Load TLS credentials for a composition function gRPC server. 41 42 Args: 43 tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt. 44 45 Returns: 46 gRPC mTLS server credentials. 47 48 tls.crt and tls.key must be the function's PEM-encoded certificate and 49 private key. ca.cert must be a PEM-encoded CA certificate used to 50 authenticate callers (i.e. Crossplane). 51 """ 52 if tls_certs_dir is None: 53 return None 54 55 with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f: 56 crt = f.read() 57 58 with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f: 59 key = f.read() 60 61 with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f: 62 ca = f.read() 63 64 return grpc.ssl_server_credentials( 65 private_key_certificate_chain_pairs=[(key, crt)], 66 root_certificates=ca, 67 require_client_auth=True, 68 )
Load TLS credentials for a composition function gRPC server.
Arguments:
- tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
Returns:
gRPC mTLS server credentials.
tls.crt and tls.key must be the function's PEM-encoded certificate and private key. ca.cert must be a PEM-encoded CA certificate used to authenticate callers (i.e. Crossplane).
71def serve( 72 function: grpcv1.FunctionRunnerService, 73 address: str, 74 *, 75 creds: grpc.ServerCredentials, 76 insecure: bool, 77) -> None: 78 """Start a gRPC server and serve requests asychronously. 79 80 Args: 81 function: The function (class) to use to serve requests. 82 address: The address at which to listen for requests. 83 creds: The credentials used to authenticate requests. 84 insecure: Serve insecurely, without credentials or encryption. 85 86 Raises: 87 ValueError if creds is None and insecure is False. 88 89 If insecure is true requests will be served insecurely, even if credentials 90 are supplied. 91 """ 92 # Define the loop before the server so everything uses the same loop. 93 loop = asyncio.get_event_loop() 94 95 server = grpc.aio.server() 96 97 loop.add_signal_handler( 98 signal.SIGTERM, 99 lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)), 100 ) 101 102 grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server) 103 grpcv1beta1.add_FunctionRunnerServiceServicer_to_server( 104 BetaFunctionRunner(wrapped=function), server 105 ) 106 reflection.enable_server_reflection(SERVICE_NAMES, server) 107 108 if creds is None and insecure is False: 109 msg = ( 110 "no credentials were provided - did you provide credentials or use " 111 "the insecure flag?" 112 ) 113 raise ValueError(msg) 114 115 if creds is not None: 116 server.add_secure_port(address, creds) 117 118 if insecure: 119 server.add_insecure_port(address) 120 121 async def start(): 122 await server.start() 123 await server.wait_for_termination() 124 125 try: 126 loop.run_until_complete(start()) 127 finally: 128 loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)) 129 loop.close()
Start a gRPC server and serve requests asychronously.
Arguments:
- function: The function (class) to use to serve requests.
- address: The address at which to listen for requests.
- creds: The credentials used to authenticate requests.
- insecure: Serve insecurely, without credentials or encryption.
Raises:
- ValueError if creds is None and insecure is False.
If insecure is true requests will be served insecurely, even if credentials are supplied.
132class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService): 133 """A BetaFunctionRunner handles beta gRPC RunFunctionRequests. 134 135 It handles requests by passing them to a wrapped v1.FunctionRunnerService. 136 Incoming v1beta1 requests are converted to v1 by round-tripping them through 137 serialization. Outgoing requests are converted from v1 to v1beta1 the same 138 way. 139 """ 140 141 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 142 """Create a new BetaFunctionRunner.""" 143 self.wrapped = wrapped 144 145 async def RunFunction( # noqa: N802 # gRPC requires this name. 146 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 147 ) -> fnv1beta1.RunFunctionResponse: 148 """Run the underlying function.""" 149 gareq = fnv1.RunFunctionRequest() 150 gareq.ParseFromString(req.SerializeToString()) 151 152 garsp = await self.wrapped.RunFunction(gareq, context) 153 rsp = fnv1beta1.RunFunctionResponse() 154 rsp.ParseFromString(garsp.SerializeToString()) 155 156 return rsp
A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
It handles requests by passing them to a wrapped v1.FunctionRunnerService. Incoming v1beta1 requests are converted to v1 by round-tripping them through serialization. Outgoing requests are converted from v1 to v1beta1 the same way.
141 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 142 """Create a new BetaFunctionRunner.""" 143 self.wrapped = wrapped
Create a new BetaFunctionRunner.
145 async def RunFunction( # noqa: N802 # gRPC requires this name. 146 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 147 ) -> fnv1beta1.RunFunctionResponse: 148 """Run the underlying function.""" 149 gareq = fnv1.RunFunctionRequest() 150 gareq.ParseFromString(req.SerializeToString()) 151 152 garsp = await self.wrapped.RunFunction(gareq, context) 153 rsp = fnv1beta1.RunFunctionResponse() 154 rsp.ParseFromString(garsp.SerializeToString()) 155 156 return rsp
Run the underlying function.