function.runtime

Utilities to create a composition function runtime.

  1# Copyright 2023 The Crossplane Authors.
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""Utilities to create a composition function runtime."""
 16
 17import asyncio
 18import os
 19import signal
 20
 21import grpc
 22from grpc_reflection.v1alpha import reflection
 23
 24import crossplane.function.proto.v1.run_function_pb2 as fnv1
 25import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1
 26import crossplane.function.proto.v1beta1.run_function_pb2 as fnv1beta1
 27import crossplane.function.proto.v1beta1.run_function_pb2_grpc as grpcv1beta1
 28
 29SERVICE_NAMES = (
 30    reflection.SERVICE_NAME,
 31    fnv1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name,
 32    fnv1beta1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name,
 33)
 34
 35SHUTDOWN_GRACE_PERIOD_SECONDS = 5
 36
 37
 38def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
 39    """Load TLS credentials for a composition function gRPC server.
 40
 41    Args:
 42        tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
 43
 44    Returns:
 45        gRPC mTLS server credentials.
 46
 47    tls.crt and tls.key must be the function's PEM-encoded certificate and
 48    private key. ca.cert must be a PEM-encoded CA certificate used to
 49    authenticate callers (i.e. Crossplane).
 50    """
 51    if tls_certs_dir is None:
 52        return None
 53
 54    with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f:
 55        crt = f.read()
 56
 57    with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f:
 58        key = f.read()
 59
 60    with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f:
 61        ca = f.read()
 62
 63    return grpc.ssl_server_credentials(
 64        private_key_certificate_chain_pairs=[(key, crt)],
 65        root_certificates=ca,
 66        require_client_auth=True,
 67    )
 68
 69
 70def serve(
 71    function: grpcv1.FunctionRunnerService,
 72    address: str,
 73    *,
 74    creds: grpc.ServerCredentials,
 75    insecure: bool,
 76) -> None:
 77    """Start a gRPC server and serve requests asychronously.
 78
 79    Args:
 80        function: The function (class) to use to serve requests.
 81        address: The address at which to listen for requests.
 82        creds: The credentials used to authenticate requests.
 83        insecure: Serve insecurely, without credentials or encryption.
 84
 85    Raises:
 86        ValueError if creds is None and insecure is False.
 87
 88    If insecure is true requests will be served insecurely, even if credentials
 89    are supplied.
 90    """
 91    # Define the loop before the server so everything uses the same loop.
 92    loop = asyncio.get_event_loop()
 93
 94    server = grpc.aio.server()
 95
 96    loop.add_signal_handler(
 97        signal.SIGTERM,
 98        lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)),
 99    )
100
101    grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server)
102    grpcv1beta1.add_FunctionRunnerServiceServicer_to_server(
103        BetaFunctionRunner(wrapped=function), server
104    )
105    reflection.enable_server_reflection(SERVICE_NAMES, server)
106
107    if creds is None and insecure is False:
108        msg = (
109            "no credentials were provided - did you provide credentials or use "
110            "the insecure flag?"
111        )
112        raise ValueError(msg)
113
114    if creds is not None:
115        server.add_secure_port(address, creds)
116
117    if insecure:
118        server.add_insecure_port(address)
119
120    async def start():
121        await server.start()
122        await server.wait_for_termination()
123
124    try:
125        loop.run_until_complete(start())
126    finally:
127        loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS))
128        loop.close()
129
130
131class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService):
132    """A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
133
134    It handles requests by passing them to a wrapped v1.FunctionRunnerService.
135    Incoming v1beta1 requests are converted to v1 by round-tripping them through
136    serialization. Outgoing requests are converted from v1 to v1beta1 the same
137    way.
138    """
139
140    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
141        """Create a new BetaFunctionRunner."""
142        self.wrapped = wrapped
143
144    async def RunFunction(  # noqa: N802  # gRPC requires this name.
145        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
146    ) -> fnv1beta1.RunFunctionResponse:
147        """Run the underlying function."""
148        gareq = fnv1.RunFunctionRequest()
149        gareq.ParseFromString(req.SerializeToString())
150
151        garsp = await self.wrapped.RunFunction(gareq, context)
152        rsp = fnv1beta1.RunFunctionResponse()
153        rsp.ParseFromString(garsp.SerializeToString())
154
155        return rsp
SERVICE_NAMES = ('grpc.reflection.v1alpha.ServerReflection', 'apiextensions.fn.proto.v1.FunctionRunnerService', 'apiextensions.fn.proto.v1beta1.FunctionRunnerService')
SHUTDOWN_GRACE_PERIOD_SECONDS = 5
def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
39def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
40    """Load TLS credentials for a composition function gRPC server.
41
42    Args:
43        tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
44
45    Returns:
46        gRPC mTLS server credentials.
47
48    tls.crt and tls.key must be the function's PEM-encoded certificate and
49    private key. ca.cert must be a PEM-encoded CA certificate used to
50    authenticate callers (i.e. Crossplane).
51    """
52    if tls_certs_dir is None:
53        return None
54
55    with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f:
56        crt = f.read()
57
58    with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f:
59        key = f.read()
60
61    with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f:
62        ca = f.read()
63
64    return grpc.ssl_server_credentials(
65        private_key_certificate_chain_pairs=[(key, crt)],
66        root_certificates=ca,
67        require_client_auth=True,
68    )

Load TLS credentials for a composition function gRPC server.

Arguments:
  • tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
Returns:

gRPC mTLS server credentials.

tls.crt and tls.key must be the function's PEM-encoded certificate and private key. ca.cert must be a PEM-encoded CA certificate used to authenticate callers (i.e. Crossplane).

def serve( function: crossplane.function.proto.v1.run_function_pb2_grpc.FunctionRunnerService, address: str, *, creds: grpc.ServerCredentials, insecure: bool) -> None:
 71def serve(
 72    function: grpcv1.FunctionRunnerService,
 73    address: str,
 74    *,
 75    creds: grpc.ServerCredentials,
 76    insecure: bool,
 77) -> None:
 78    """Start a gRPC server and serve requests asychronously.
 79
 80    Args:
 81        function: The function (class) to use to serve requests.
 82        address: The address at which to listen for requests.
 83        creds: The credentials used to authenticate requests.
 84        insecure: Serve insecurely, without credentials or encryption.
 85
 86    Raises:
 87        ValueError if creds is None and insecure is False.
 88
 89    If insecure is true requests will be served insecurely, even if credentials
 90    are supplied.
 91    """
 92    # Define the loop before the server so everything uses the same loop.
 93    loop = asyncio.get_event_loop()
 94
 95    server = grpc.aio.server()
 96
 97    loop.add_signal_handler(
 98        signal.SIGTERM,
 99        lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)),
100    )
101
102    grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server)
103    grpcv1beta1.add_FunctionRunnerServiceServicer_to_server(
104        BetaFunctionRunner(wrapped=function), server
105    )
106    reflection.enable_server_reflection(SERVICE_NAMES, server)
107
108    if creds is None and insecure is False:
109        msg = (
110            "no credentials were provided - did you provide credentials or use "
111            "the insecure flag?"
112        )
113        raise ValueError(msg)
114
115    if creds is not None:
116        server.add_secure_port(address, creds)
117
118    if insecure:
119        server.add_insecure_port(address)
120
121    async def start():
122        await server.start()
123        await server.wait_for_termination()
124
125    try:
126        loop.run_until_complete(start())
127    finally:
128        loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS))
129        loop.close()

Start a gRPC server and serve requests asychronously.

Arguments:
  • function: The function (class) to use to serve requests.
  • address: The address at which to listen for requests.
  • creds: The credentials used to authenticate requests.
  • insecure: Serve insecurely, without credentials or encryption.
Raises:
  • ValueError if creds is None and insecure is False.

If insecure is true requests will be served insecurely, even if credentials are supplied.

class BetaFunctionRunner(crossplane.function.proto.v1beta1.run_function_pb2_grpc.FunctionRunnerService):
132class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService):
133    """A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
134
135    It handles requests by passing them to a wrapped v1.FunctionRunnerService.
136    Incoming v1beta1 requests are converted to v1 by round-tripping them through
137    serialization. Outgoing requests are converted from v1 to v1beta1 the same
138    way.
139    """
140
141    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
142        """Create a new BetaFunctionRunner."""
143        self.wrapped = wrapped
144
145    async def RunFunction(  # noqa: N802  # gRPC requires this name.
146        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
147    ) -> fnv1beta1.RunFunctionResponse:
148        """Run the underlying function."""
149        gareq = fnv1.RunFunctionRequest()
150        gareq.ParseFromString(req.SerializeToString())
151
152        garsp = await self.wrapped.RunFunction(gareq, context)
153        rsp = fnv1beta1.RunFunctionResponse()
154        rsp.ParseFromString(garsp.SerializeToString())
155
156        return rsp

A BetaFunctionRunner handles beta gRPC RunFunctionRequests.

It handles requests by passing them to a wrapped v1.FunctionRunnerService. Incoming v1beta1 requests are converted to v1 by round-tripping them through serialization. Outgoing requests are converted from v1 to v1beta1 the same way.

BetaFunctionRunner( wrapped: crossplane.function.proto.v1.run_function_pb2_grpc.FunctionRunnerService)
141    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
142        """Create a new BetaFunctionRunner."""
143        self.wrapped = wrapped

Create a new BetaFunctionRunner.

wrapped
async def RunFunction( self, req: crossplane.function.proto.v1beta1.run_function_pb2.RunFunctionRequest, context: grpc.aio._base_server.ServicerContext) -> crossplane.function.proto.v1beta1.run_function_pb2.RunFunctionResponse:
145    async def RunFunction(  # noqa: N802  # gRPC requires this name.
146        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
147    ) -> fnv1beta1.RunFunctionResponse:
148        """Run the underlying function."""
149        gareq = fnv1.RunFunctionRequest()
150        gareq.ParseFromString(req.SerializeToString())
151
152        garsp = await self.wrapped.RunFunction(gareq, context)
153        rsp = fnv1beta1.RunFunctionResponse()
154        rsp.ParseFromString(garsp.SerializeToString())
155
156        return rsp

Run the underlying function.