function.runtime
Utilities to create a composition function runtime.
1# Copyright 2023 The Crossplane Authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Utilities to create a composition function runtime.""" 16 17import asyncio 18import os 19 20import grpc 21from grpc_reflection.v1alpha import reflection 22 23import crossplane.function.proto.v1.run_function_pb2 as fnv1 24import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1 25import crossplane.function.proto.v1beta1.run_function_pb2 as fnv1beta1 26import crossplane.function.proto.v1beta1.run_function_pb2_grpc as grpcv1beta1 27 28SERVICE_NAMES = ( 29 reflection.SERVICE_NAME, 30 fnv1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name, 31 fnv1beta1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name, 32) 33 34 35def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials: 36 """Load TLS credentials for a composition function gRPC server. 37 38 Args: 39 tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt. 40 41 Returns: 42 gRPC mTLS server credentials. 43 44 tls.crt and tls.key must be the function's PEM-encoded certificate and 45 private key. ca.cert must be a PEM-encoded CA certificate used to 46 authenticate callers (i.e. Crossplane). 47 """ 48 if tls_certs_dir is None: 49 return None 50 51 with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f: 52 crt = f.read() 53 54 with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f: 55 key = f.read() 56 57 with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f: 58 ca = f.read() 59 60 return grpc.ssl_server_credentials( 61 private_key_certificate_chain_pairs=[(key, crt)], 62 root_certificates=ca, 63 require_client_auth=True, 64 ) 65 66 67def serve( 68 function: grpcv1.FunctionRunnerService, 69 address: str, 70 *, 71 creds: grpc.ServerCredentials, 72 insecure: bool, 73) -> None: 74 """Start a gRPC server and serve requests asychronously. 75 76 Args: 77 function: The function (class) to use to serve requests. 78 address: The address at which to listen for requests. 79 creds: The credentials used to authenticate requests. 80 insecure: Serve insecurely, without credentials or encryption. 81 82 Raises: 83 ValueError if creds is None and insecure is False. 84 85 If insecure is true requests will be served insecurely, even if credentials 86 are supplied. 87 """ 88 # Define the loop before the server so everything uses the same loop. 89 loop = asyncio.get_event_loop() 90 91 server = grpc.aio.server() 92 93 grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server) 94 grpcv1beta1.add_FunctionRunnerServiceServicer_to_server( 95 BetaFunctionRunner(wrapped=function), server 96 ) 97 reflection.enable_server_reflection(SERVICE_NAMES, server) 98 99 if creds is None and insecure is False: 100 msg = ( 101 "no credentials were provided - did you provide credentials or use " 102 "the insecure flag?" 103 ) 104 raise ValueError(msg) 105 106 if creds is not None: 107 server.add_secure_port(address, creds) 108 109 if insecure: 110 server.add_insecure_port(address) 111 112 async def start(): 113 await server.start() 114 await server.wait_for_termination() 115 116 try: 117 loop.run_until_complete(start()) 118 finally: 119 loop.run_until_complete(server.stop(grace=5)) 120 loop.close() 121 122 123class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService): 124 """A BetaFunctionRunner handles beta gRPC RunFunctionRequests. 125 126 It handles requests by passing them to a wrapped v1.FunctionRunnerService. 127 Incoming v1beta1 requests are converted to v1 by round-tripping them through 128 serialization. Outgoing requests are converted from v1 to v1beta1 the same 129 way. 130 """ 131 132 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 133 """Create a new BetaFunctionRunner.""" 134 self.wrapped = wrapped 135 136 async def RunFunction( # noqa: N802 # gRPC requires this name. 137 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 138 ) -> fnv1beta1.RunFunctionResponse: 139 """Run the underlying function.""" 140 gareq = fnv1.RunFunctionRequest() 141 gareq.ParseFromString(req.SerializeToString()) 142 143 garsp = await self.wrapped.RunFunction(gareq, context) 144 rsp = fnv1beta1.RunFunctionResponse() 145 rsp.ParseFromString(garsp.SerializeToString()) 146 147 return rsp
36def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials: 37 """Load TLS credentials for a composition function gRPC server. 38 39 Args: 40 tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt. 41 42 Returns: 43 gRPC mTLS server credentials. 44 45 tls.crt and tls.key must be the function's PEM-encoded certificate and 46 private key. ca.cert must be a PEM-encoded CA certificate used to 47 authenticate callers (i.e. Crossplane). 48 """ 49 if tls_certs_dir is None: 50 return None 51 52 with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f: 53 crt = f.read() 54 55 with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f: 56 key = f.read() 57 58 with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f: 59 ca = f.read() 60 61 return grpc.ssl_server_credentials( 62 private_key_certificate_chain_pairs=[(key, crt)], 63 root_certificates=ca, 64 require_client_auth=True, 65 )
Load TLS credentials for a composition function gRPC server.
Arguments:
- tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
Returns:
gRPC mTLS server credentials.
tls.crt and tls.key must be the function's PEM-encoded certificate and private key. ca.cert must be a PEM-encoded CA certificate used to authenticate callers (i.e. Crossplane).
68def serve( 69 function: grpcv1.FunctionRunnerService, 70 address: str, 71 *, 72 creds: grpc.ServerCredentials, 73 insecure: bool, 74) -> None: 75 """Start a gRPC server and serve requests asychronously. 76 77 Args: 78 function: The function (class) to use to serve requests. 79 address: The address at which to listen for requests. 80 creds: The credentials used to authenticate requests. 81 insecure: Serve insecurely, without credentials or encryption. 82 83 Raises: 84 ValueError if creds is None and insecure is False. 85 86 If insecure is true requests will be served insecurely, even if credentials 87 are supplied. 88 """ 89 # Define the loop before the server so everything uses the same loop. 90 loop = asyncio.get_event_loop() 91 92 server = grpc.aio.server() 93 94 grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server) 95 grpcv1beta1.add_FunctionRunnerServiceServicer_to_server( 96 BetaFunctionRunner(wrapped=function), server 97 ) 98 reflection.enable_server_reflection(SERVICE_NAMES, server) 99 100 if creds is None and insecure is False: 101 msg = ( 102 "no credentials were provided - did you provide credentials or use " 103 "the insecure flag?" 104 ) 105 raise ValueError(msg) 106 107 if creds is not None: 108 server.add_secure_port(address, creds) 109 110 if insecure: 111 server.add_insecure_port(address) 112 113 async def start(): 114 await server.start() 115 await server.wait_for_termination() 116 117 try: 118 loop.run_until_complete(start()) 119 finally: 120 loop.run_until_complete(server.stop(grace=5)) 121 loop.close()
Start a gRPC server and serve requests asychronously.
Arguments:
- function: The function (class) to use to serve requests.
- address: The address at which to listen for requests.
- creds: The credentials used to authenticate requests.
- insecure: Serve insecurely, without credentials or encryption.
Raises:
- ValueError if creds is None and insecure is False.
If insecure is true requests will be served insecurely, even if credentials are supplied.
124class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService): 125 """A BetaFunctionRunner handles beta gRPC RunFunctionRequests. 126 127 It handles requests by passing them to a wrapped v1.FunctionRunnerService. 128 Incoming v1beta1 requests are converted to v1 by round-tripping them through 129 serialization. Outgoing requests are converted from v1 to v1beta1 the same 130 way. 131 """ 132 133 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 134 """Create a new BetaFunctionRunner.""" 135 self.wrapped = wrapped 136 137 async def RunFunction( # noqa: N802 # gRPC requires this name. 138 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 139 ) -> fnv1beta1.RunFunctionResponse: 140 """Run the underlying function.""" 141 gareq = fnv1.RunFunctionRequest() 142 gareq.ParseFromString(req.SerializeToString()) 143 144 garsp = await self.wrapped.RunFunction(gareq, context) 145 rsp = fnv1beta1.RunFunctionResponse() 146 rsp.ParseFromString(garsp.SerializeToString()) 147 148 return rsp
A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
It handles requests by passing them to a wrapped v1.FunctionRunnerService. Incoming v1beta1 requests are converted to v1 by round-tripping them through serialization. Outgoing requests are converted from v1 to v1beta1 the same way.
133 def __init__(self, wrapped: grpcv1.FunctionRunnerService): 134 """Create a new BetaFunctionRunner.""" 135 self.wrapped = wrapped
Create a new BetaFunctionRunner.
137 async def RunFunction( # noqa: N802 # gRPC requires this name. 138 self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext 139 ) -> fnv1beta1.RunFunctionResponse: 140 """Run the underlying function.""" 141 gareq = fnv1.RunFunctionRequest() 142 gareq.ParseFromString(req.SerializeToString()) 143 144 garsp = await self.wrapped.RunFunction(gareq, context) 145 rsp = fnv1beta1.RunFunctionResponse() 146 rsp.ParseFromString(garsp.SerializeToString()) 147 148 return rsp
Run the underlying function.