function.runtime

Utilities to create a composition function runtime.

  1# Copyright 2023 The Crossplane Authors.
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""Utilities to create a composition function runtime."""
 16
 17import asyncio
 18import os
 19
 20import grpc
 21from grpc_reflection.v1alpha import reflection
 22
 23import crossplane.function.proto.v1.run_function_pb2 as fnv1
 24import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1
 25import crossplane.function.proto.v1beta1.run_function_pb2 as fnv1beta1
 26import crossplane.function.proto.v1beta1.run_function_pb2_grpc as grpcv1beta1
 27
 28SERVICE_NAMES = (
 29    reflection.SERVICE_NAME,
 30    fnv1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name,
 31    fnv1beta1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name,
 32)
 33
 34
 35def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
 36    """Load TLS credentials for a composition function gRPC server.
 37
 38    Args:
 39        tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
 40
 41    Returns:
 42        gRPC mTLS server credentials.
 43
 44    tls.crt and tls.key must be the function's PEM-encoded certificate and
 45    private key. ca.cert must be a PEM-encoded CA certificate used to
 46    authenticate callers (i.e. Crossplane).
 47    """
 48    if tls_certs_dir is None:
 49        return None
 50
 51    with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f:
 52        crt = f.read()
 53
 54    with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f:
 55        key = f.read()
 56
 57    with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f:
 58        ca = f.read()
 59
 60    return grpc.ssl_server_credentials(
 61        private_key_certificate_chain_pairs=[(key, crt)],
 62        root_certificates=ca,
 63        require_client_auth=True,
 64    )
 65
 66
 67def serve(
 68    function: grpcv1.FunctionRunnerService,
 69    address: str,
 70    *,
 71    creds: grpc.ServerCredentials,
 72    insecure: bool,
 73) -> None:
 74    """Start a gRPC server and serve requests asychronously.
 75
 76    Args:
 77        function: The function (class) to use to serve requests.
 78        address: The address at which to listen for requests.
 79        creds: The credentials used to authenticate requests.
 80        insecure: Serve insecurely, without credentials or encryption.
 81
 82    Raises:
 83        ValueError if creds is None and insecure is False.
 84
 85    If insecure is true requests will be served insecurely, even if credentials
 86    are supplied.
 87    """
 88    # Define the loop before the server so everything uses the same loop.
 89    loop = asyncio.get_event_loop()
 90
 91    server = grpc.aio.server()
 92
 93    grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server)
 94    grpcv1beta1.add_FunctionRunnerServiceServicer_to_server(
 95        BetaFunctionRunner(wrapped=function), server
 96    )
 97    reflection.enable_server_reflection(SERVICE_NAMES, server)
 98
 99    if creds is None and insecure is False:
100        msg = (
101            "no credentials were provided - did you provide credentials or use "
102            "the insecure flag?"
103        )
104        raise ValueError(msg)
105
106    if creds is not None:
107        server.add_secure_port(address, creds)
108
109    if insecure:
110        server.add_insecure_port(address)
111
112    async def start():
113        await server.start()
114        await server.wait_for_termination()
115
116    try:
117        loop.run_until_complete(start())
118    finally:
119        loop.run_until_complete(server.stop(grace=5))
120        loop.close()
121
122
123class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService):
124    """A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
125
126    It handles requests by passing them to a wrapped v1.FunctionRunnerService.
127    Incoming v1beta1 requests are converted to v1 by round-tripping them through
128    serialization. Outgoing requests are converted from v1 to v1beta1 the same
129    way.
130    """
131
132    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
133        """Create a new BetaFunctionRunner."""
134        self.wrapped = wrapped
135
136    async def RunFunction(  # noqa: N802  # gRPC requires this name.
137        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
138    ) -> fnv1beta1.RunFunctionResponse:
139        """Run the underlying function."""
140        gareq = fnv1.RunFunctionRequest()
141        gareq.ParseFromString(req.SerializeToString())
142
143        garsp = await self.wrapped.RunFunction(gareq, context)
144        rsp = fnv1beta1.RunFunctionResponse()
145        rsp.ParseFromString(garsp.SerializeToString())
146
147        return rsp
SERVICE_NAMES = ('grpc.reflection.v1alpha.ServerReflection', 'apiextensions.fn.proto.v1.FunctionRunnerService', 'apiextensions.fn.proto.v1beta1.FunctionRunnerService')
def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
36def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
37    """Load TLS credentials for a composition function gRPC server.
38
39    Args:
40        tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
41
42    Returns:
43        gRPC mTLS server credentials.
44
45    tls.crt and tls.key must be the function's PEM-encoded certificate and
46    private key. ca.cert must be a PEM-encoded CA certificate used to
47    authenticate callers (i.e. Crossplane).
48    """
49    if tls_certs_dir is None:
50        return None
51
52    with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f:
53        crt = f.read()
54
55    with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f:
56        key = f.read()
57
58    with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f:
59        ca = f.read()
60
61    return grpc.ssl_server_credentials(
62        private_key_certificate_chain_pairs=[(key, crt)],
63        root_certificates=ca,
64        require_client_auth=True,
65    )

Load TLS credentials for a composition function gRPC server.

Arguments:
  • tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
Returns:

gRPC mTLS server credentials.

tls.crt and tls.key must be the function's PEM-encoded certificate and private key. ca.cert must be a PEM-encoded CA certificate used to authenticate callers (i.e. Crossplane).

def serve( function: crossplane.function.proto.v1.run_function_pb2_grpc.FunctionRunnerService, address: str, *, creds: grpc.ServerCredentials, insecure: bool) -> None:
 68def serve(
 69    function: grpcv1.FunctionRunnerService,
 70    address: str,
 71    *,
 72    creds: grpc.ServerCredentials,
 73    insecure: bool,
 74) -> None:
 75    """Start a gRPC server and serve requests asychronously.
 76
 77    Args:
 78        function: The function (class) to use to serve requests.
 79        address: The address at which to listen for requests.
 80        creds: The credentials used to authenticate requests.
 81        insecure: Serve insecurely, without credentials or encryption.
 82
 83    Raises:
 84        ValueError if creds is None and insecure is False.
 85
 86    If insecure is true requests will be served insecurely, even if credentials
 87    are supplied.
 88    """
 89    # Define the loop before the server so everything uses the same loop.
 90    loop = asyncio.get_event_loop()
 91
 92    server = grpc.aio.server()
 93
 94    grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server)
 95    grpcv1beta1.add_FunctionRunnerServiceServicer_to_server(
 96        BetaFunctionRunner(wrapped=function), server
 97    )
 98    reflection.enable_server_reflection(SERVICE_NAMES, server)
 99
100    if creds is None and insecure is False:
101        msg = (
102            "no credentials were provided - did you provide credentials or use "
103            "the insecure flag?"
104        )
105        raise ValueError(msg)
106
107    if creds is not None:
108        server.add_secure_port(address, creds)
109
110    if insecure:
111        server.add_insecure_port(address)
112
113    async def start():
114        await server.start()
115        await server.wait_for_termination()
116
117    try:
118        loop.run_until_complete(start())
119    finally:
120        loop.run_until_complete(server.stop(grace=5))
121        loop.close()

Start a gRPC server and serve requests asychronously.

Arguments:
  • function: The function (class) to use to serve requests.
  • address: The address at which to listen for requests.
  • creds: The credentials used to authenticate requests.
  • insecure: Serve insecurely, without credentials or encryption.
Raises:
  • ValueError if creds is None and insecure is False.

If insecure is true requests will be served insecurely, even if credentials are supplied.

class BetaFunctionRunner(crossplane.function.proto.v1beta1.run_function_pb2_grpc.FunctionRunnerService):
124class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService):
125    """A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
126
127    It handles requests by passing them to a wrapped v1.FunctionRunnerService.
128    Incoming v1beta1 requests are converted to v1 by round-tripping them through
129    serialization. Outgoing requests are converted from v1 to v1beta1 the same
130    way.
131    """
132
133    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
134        """Create a new BetaFunctionRunner."""
135        self.wrapped = wrapped
136
137    async def RunFunction(  # noqa: N802  # gRPC requires this name.
138        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
139    ) -> fnv1beta1.RunFunctionResponse:
140        """Run the underlying function."""
141        gareq = fnv1.RunFunctionRequest()
142        gareq.ParseFromString(req.SerializeToString())
143
144        garsp = await self.wrapped.RunFunction(gareq, context)
145        rsp = fnv1beta1.RunFunctionResponse()
146        rsp.ParseFromString(garsp.SerializeToString())
147
148        return rsp

A BetaFunctionRunner handles beta gRPC RunFunctionRequests.

It handles requests by passing them to a wrapped v1.FunctionRunnerService. Incoming v1beta1 requests are converted to v1 by round-tripping them through serialization. Outgoing requests are converted from v1 to v1beta1 the same way.

BetaFunctionRunner( wrapped: crossplane.function.proto.v1.run_function_pb2_grpc.FunctionRunnerService)
133    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
134        """Create a new BetaFunctionRunner."""
135        self.wrapped = wrapped

Create a new BetaFunctionRunner.

wrapped
async def RunFunction( self, req: crossplane.function.proto.v1beta1.run_function_pb2.RunFunctionRequest, context: grpc.aio._base_server.ServicerContext) -> crossplane.function.proto.v1beta1.run_function_pb2.RunFunctionResponse:
137    async def RunFunction(  # noqa: N802  # gRPC requires this name.
138        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
139    ) -> fnv1beta1.RunFunctionResponse:
140        """Run the underlying function."""
141        gareq = fnv1.RunFunctionRequest()
142        gareq.ParseFromString(req.SerializeToString())
143
144        garsp = await self.wrapped.RunFunction(gareq, context)
145        rsp = fnv1beta1.RunFunctionResponse()
146        rsp.ParseFromString(garsp.SerializeToString())
147
148        return rsp

Run the underlying function.