function.runtime

Utilities to create a composition function runtime.

  1# Copyright 2023 The Crossplane Authors.
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""Utilities to create a composition function runtime."""
 16
 17import asyncio
 18import os
 19import signal
 20from collections.abc import Sequence
 21from typing import Any
 22
 23import grpc
 24from grpc_reflection.v1alpha import reflection
 25
 26import crossplane.function.proto.v1.run_function_pb2 as fnv1
 27import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1
 28import crossplane.function.proto.v1beta1.run_function_pb2 as fnv1beta1
 29import crossplane.function.proto.v1beta1.run_function_pb2_grpc as grpcv1beta1
 30
 31SERVICE_NAMES = (
 32    reflection.SERVICE_NAME,
 33    fnv1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name,
 34    fnv1beta1.DESCRIPTOR.services_by_name["FunctionRunnerService"].full_name,
 35)
 36
 37SHUTDOWN_GRACE_PERIOD_SECONDS = 5
 38
 39
 40def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
 41    """Load TLS credentials for a composition function gRPC server.
 42
 43    Args:
 44        tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
 45
 46    Returns:
 47        gRPC mTLS server credentials.
 48
 49    tls.crt and tls.key must be the function's PEM-encoded certificate and
 50    private key. ca.cert must be a PEM-encoded CA certificate used to
 51    authenticate callers (i.e. Crossplane).
 52    """
 53    if tls_certs_dir is None:
 54        return None
 55
 56    with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f:
 57        crt = f.read()
 58
 59    with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f:
 60        key = f.read()
 61
 62    with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f:
 63        ca = f.read()
 64
 65    return grpc.ssl_server_credentials(
 66        private_key_certificate_chain_pairs=[(key, crt)],
 67        root_certificates=ca,
 68        require_client_auth=True,
 69    )
 70
 71
 72def serve(
 73    function: grpcv1.FunctionRunnerService,
 74    address: str,
 75    *,
 76    creds: grpc.ServerCredentials,
 77    insecure: bool,
 78    options: Sequence[tuple[str, Any]] | None = None,
 79) -> None:
 80    """Start a gRPC server and serve requests asychronously.
 81
 82    Args:
 83        function: The function (class) to use to serve requests.
 84        address: The address at which to listen for requests.
 85        creds: The credentials used to authenticate requests.
 86        insecure: Serve insecurely, without credentials or encryption.
 87        options: Additional gRPC server options. Eg. set max receive message
 88                 size to 5Mb (default is 4Mb):
 89                 [("grpc.max_receive_message_length", 1024 * 1024 * 5)]
 90
 91    Raises:
 92        ValueError if creds is None and insecure is False.
 93
 94    If insecure is true requests will be served insecurely, even if credentials
 95    are supplied.
 96    """
 97    # Define the loop before the server so everything uses the same loop.
 98    loop = asyncio.get_event_loop()
 99
100    server = grpc.aio.server(options=options)
101
102    loop.add_signal_handler(
103        signal.SIGTERM,
104        lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)),
105    )
106
107    grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server)
108    grpcv1beta1.add_FunctionRunnerServiceServicer_to_server(
109        BetaFunctionRunner(wrapped=function), server
110    )
111    reflection.enable_server_reflection(SERVICE_NAMES, server)
112
113    if creds is None and insecure is False:
114        msg = (
115            "no credentials were provided - did you provide credentials or use "
116            "the insecure flag?"
117        )
118        raise ValueError(msg)
119
120    if creds is not None:
121        server.add_secure_port(address, creds)
122
123    if insecure:
124        server.add_insecure_port(address)
125
126    async def start():
127        await server.start()
128        await server.wait_for_termination()
129
130    try:
131        loop.run_until_complete(start())
132    finally:
133        loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS))
134        loop.close()
135
136
137class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService):
138    """A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
139
140    It handles requests by passing them to a wrapped v1.FunctionRunnerService.
141    Incoming v1beta1 requests are converted to v1 by round-tripping them through
142    serialization. Outgoing requests are converted from v1 to v1beta1 the same
143    way.
144    """
145
146    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
147        """Create a new BetaFunctionRunner."""
148        self.wrapped = wrapped
149
150    async def RunFunction(  # noqa: N802  # gRPC requires this name.
151        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
152    ) -> fnv1beta1.RunFunctionResponse:
153        """Run the underlying function."""
154        gareq = fnv1.RunFunctionRequest()
155        gareq.ParseFromString(req.SerializeToString())
156
157        garsp = await self.wrapped.RunFunction(gareq, context)
158        rsp = fnv1beta1.RunFunctionResponse()
159        rsp.ParseFromString(garsp.SerializeToString())
160
161        return rsp
SERVICE_NAMES = ('grpc.reflection.v1alpha.ServerReflection', 'apiextensions.fn.proto.v1.FunctionRunnerService', 'apiextensions.fn.proto.v1beta1.FunctionRunnerService')
SHUTDOWN_GRACE_PERIOD_SECONDS = 5
def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
41def load_credentials(tls_certs_dir: str) -> grpc.ServerCredentials:
42    """Load TLS credentials for a composition function gRPC server.
43
44    Args:
45        tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
46
47    Returns:
48        gRPC mTLS server credentials.
49
50    tls.crt and tls.key must be the function's PEM-encoded certificate and
51    private key. ca.cert must be a PEM-encoded CA certificate used to
52    authenticate callers (i.e. Crossplane).
53    """
54    if tls_certs_dir is None:
55        return None
56
57    with open(os.path.join(tls_certs_dir, "tls.crt"), "rb") as f:
58        crt = f.read()
59
60    with open(os.path.join(tls_certs_dir, "tls.key"), "rb") as f:
61        key = f.read()
62
63    with open(os.path.join(tls_certs_dir, "ca.crt"), "rb") as f:
64        ca = f.read()
65
66    return grpc.ssl_server_credentials(
67        private_key_certificate_chain_pairs=[(key, crt)],
68        root_certificates=ca,
69        require_client_auth=True,
70    )

Load TLS credentials for a composition function gRPC server.

Arguments:
  • tls_certs_dir: A directory containing tls.crt, tls.key, and ca.crt.
Returns:

gRPC mTLS server credentials.

tls.crt and tls.key must be the function's PEM-encoded certificate and private key. ca.cert must be a PEM-encoded CA certificate used to authenticate callers (i.e. Crossplane).

def serve( function: crossplane.function.proto.v1.run_function_pb2_grpc.FunctionRunnerService, address: str, *, creds: grpc.ServerCredentials, insecure: bool, options: Sequence[tuple[str, typing.Any]] | None = None) -> None:
 73def serve(
 74    function: grpcv1.FunctionRunnerService,
 75    address: str,
 76    *,
 77    creds: grpc.ServerCredentials,
 78    insecure: bool,
 79    options: Sequence[tuple[str, Any]] | None = None,
 80) -> None:
 81    """Start a gRPC server and serve requests asychronously.
 82
 83    Args:
 84        function: The function (class) to use to serve requests.
 85        address: The address at which to listen for requests.
 86        creds: The credentials used to authenticate requests.
 87        insecure: Serve insecurely, without credentials or encryption.
 88        options: Additional gRPC server options. Eg. set max receive message
 89                 size to 5Mb (default is 4Mb):
 90                 [("grpc.max_receive_message_length", 1024 * 1024 * 5)]
 91
 92    Raises:
 93        ValueError if creds is None and insecure is False.
 94
 95    If insecure is true requests will be served insecurely, even if credentials
 96    are supplied.
 97    """
 98    # Define the loop before the server so everything uses the same loop.
 99    loop = asyncio.get_event_loop()
100
101    server = grpc.aio.server(options=options)
102
103    loop.add_signal_handler(
104        signal.SIGTERM,
105        lambda: asyncio.ensure_future(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS)),
106    )
107
108    grpcv1.add_FunctionRunnerServiceServicer_to_server(function, server)
109    grpcv1beta1.add_FunctionRunnerServiceServicer_to_server(
110        BetaFunctionRunner(wrapped=function), server
111    )
112    reflection.enable_server_reflection(SERVICE_NAMES, server)
113
114    if creds is None and insecure is False:
115        msg = (
116            "no credentials were provided - did you provide credentials or use "
117            "the insecure flag?"
118        )
119        raise ValueError(msg)
120
121    if creds is not None:
122        server.add_secure_port(address, creds)
123
124    if insecure:
125        server.add_insecure_port(address)
126
127    async def start():
128        await server.start()
129        await server.wait_for_termination()
130
131    try:
132        loop.run_until_complete(start())
133    finally:
134        loop.run_until_complete(server.stop(grace=SHUTDOWN_GRACE_PERIOD_SECONDS))
135        loop.close()

Start a gRPC server and serve requests asychronously.

Arguments:
  • function: The function (class) to use to serve requests.
  • address: The address at which to listen for requests.
  • creds: The credentials used to authenticate requests.
  • insecure: Serve insecurely, without credentials or encryption.
  • options: Additional gRPC server options. Eg. set max receive message size to 5Mb (default is 4Mb): [("grpc.max_receive_message_length", 1024 * 1024 * 5)]
Raises:
  • ValueError if creds is None and insecure is False.

If insecure is true requests will be served insecurely, even if credentials are supplied.

class BetaFunctionRunner(crossplane.function.proto.v1beta1.run_function_pb2_grpc.FunctionRunnerService):
138class BetaFunctionRunner(grpcv1beta1.FunctionRunnerService):
139    """A BetaFunctionRunner handles beta gRPC RunFunctionRequests.
140
141    It handles requests by passing them to a wrapped v1.FunctionRunnerService.
142    Incoming v1beta1 requests are converted to v1 by round-tripping them through
143    serialization. Outgoing requests are converted from v1 to v1beta1 the same
144    way.
145    """
146
147    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
148        """Create a new BetaFunctionRunner."""
149        self.wrapped = wrapped
150
151    async def RunFunction(  # noqa: N802  # gRPC requires this name.
152        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
153    ) -> fnv1beta1.RunFunctionResponse:
154        """Run the underlying function."""
155        gareq = fnv1.RunFunctionRequest()
156        gareq.ParseFromString(req.SerializeToString())
157
158        garsp = await self.wrapped.RunFunction(gareq, context)
159        rsp = fnv1beta1.RunFunctionResponse()
160        rsp.ParseFromString(garsp.SerializeToString())
161
162        return rsp

A BetaFunctionRunner handles beta gRPC RunFunctionRequests.

It handles requests by passing them to a wrapped v1.FunctionRunnerService. Incoming v1beta1 requests are converted to v1 by round-tripping them through serialization. Outgoing requests are converted from v1 to v1beta1 the same way.

BetaFunctionRunner( wrapped: crossplane.function.proto.v1.run_function_pb2_grpc.FunctionRunnerService)
147    def __init__(self, wrapped: grpcv1.FunctionRunnerService):
148        """Create a new BetaFunctionRunner."""
149        self.wrapped = wrapped

Create a new BetaFunctionRunner.

wrapped
async def RunFunction( self, req: crossplane.function.proto.v1beta1.run_function_pb2.RunFunctionRequest, context: grpc.aio._base_server.ServicerContext) -> crossplane.function.proto.v1beta1.run_function_pb2.RunFunctionResponse:
151    async def RunFunction(  # noqa: N802  # gRPC requires this name.
152        self, req: fnv1beta1.RunFunctionRequest, context: grpc.aio.ServicerContext
153    ) -> fnv1beta1.RunFunctionResponse:
154        """Run the underlying function."""
155        gareq = fnv1.RunFunctionRequest()
156        gareq.ParseFromString(req.SerializeToString())
157
158        garsp = await self.wrapped.RunFunction(gareq, context)
159        rsp = fnv1beta1.RunFunctionResponse()
160        rsp.ParseFromString(garsp.SerializeToString())
161
162        return rsp

Run the underlying function.