function.response

Utilities for working with RunFunctionResponses.

  1# Copyright 2023 The Crossplane Authors.
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15"""Utilities for working with RunFunctionResponses."""
 16
 17import datetime
 18
 19from google.protobuf import duration_pb2 as durationpb
 20from google.protobuf import struct_pb2 as structpb
 21
 22import crossplane.function.proto.v1.run_function_pb2 as fnv1
 23from crossplane.function import resource
 24
 25"""The default TTL for which a RunFunctionResponse may be cached."""
 26DEFAULT_TTL = datetime.timedelta(minutes=1)
 27
 28
 29def to(
 30    req: fnv1.RunFunctionRequest,
 31    ttl: datetime.timedelta = DEFAULT_TTL,
 32) -> fnv1.RunFunctionResponse:
 33    """Create a response to the supplied request.
 34
 35    Args:
 36        req: The request to respond to.
 37        ttl: How long Crossplane may optionally cache the response.
 38
 39    Returns:
 40        A response to the supplied request.
 41
 42    The request's tag, desired resources, and context is automatically copied to
 43    the response. Using response.to is a good pattern to ensure
 44    """
 45    dttl = durationpb.Duration()
 46    dttl.FromTimedelta(ttl)
 47    return fnv1.RunFunctionResponse(
 48        meta=fnv1.ResponseMeta(tag=req.meta.tag, ttl=dttl),
 49        desired=req.desired,
 50        context=req.context,
 51    )
 52
 53
 54def normal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
 55    """Add a normal result to the response."""
 56    rsp.results.append(
 57        fnv1.Result(
 58            severity=fnv1.SEVERITY_NORMAL,
 59            message=message,
 60        )
 61    )
 62
 63
 64def warning(rsp: fnv1.RunFunctionResponse, message: str) -> None:
 65    """Add a warning result to the response."""
 66    rsp.results.append(
 67        fnv1.Result(
 68            severity=fnv1.SEVERITY_WARNING,
 69            message=message,
 70        )
 71    )
 72
 73
 74def fatal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
 75    """Add a fatal result to the response."""
 76    rsp.results.append(
 77        fnv1.Result(
 78            severity=fnv1.SEVERITY_FATAL,
 79            message=message,
 80        )
 81    )
 82
 83
 84def set_output(rsp: fnv1.RunFunctionResponse, output: dict | structpb.Struct) -> None:
 85    """Set the output field in a RunFunctionResponse for operation functions.
 86
 87    Args:
 88        rsp: The RunFunctionResponse to update.
 89        output: The output data as a dictionary or protobuf Struct.
 90
 91    Operation functions can return arbitrary output data that will be written
 92    to the Operation's status.pipeline field. This function sets that output
 93    on the response.
 94    """
 95    match output:
 96        case dict():
 97            rsp.output.CopyFrom(resource.dict_to_struct(output))
 98        case structpb.Struct():
 99            rsp.output.CopyFrom(output)
100        case _:
101            t = type(output)
102            msg = f"Unsupported output type: {t}"
103            raise TypeError(msg)
104
105
106def require_resources(  # noqa: PLR0913
107    rsp: fnv1.RunFunctionResponse,
108    name: str,
109    api_version: str,
110    kind: str,
111    *,
112    match_name: str | None = None,
113    match_labels: dict[str, str] | None = None,
114    namespace: str | None = None,
115) -> None:
116    """Add a resource requirement to the response.
117
118    Args:
119        rsp: The RunFunctionResponse to update.
120        name: The name to use for this requirement.
121        api_version: The API version of resources to require.
122        kind: The kind of resources to require.
123        match_name: Match a resource by name (mutually exclusive with match_labels).
124        match_labels: Match resources by labels (mutually exclusive with match_name).
125        namespace: The namespace to search in (optional).
126
127    Raises:
128        ValueError: If both match_name and match_labels are provided, or neither.
129
130    This tells Crossplane to fetch the specified resources and include them
131    in the next call to the function in req.required_resources[name].
132    """
133    if (match_name is None) == (match_labels is None):
134        msg = "Exactly one of match_name or match_labels must be provided"
135        raise ValueError(msg)
136
137    selector = fnv1.ResourceSelector(
138        api_version=api_version,
139        kind=kind,
140    )
141
142    if match_name is not None:
143        selector.match_name = match_name
144
145    if match_labels is not None:
146        selector.match_labels.labels.update(match_labels)
147
148    if namespace is not None:
149        selector.namespace = namespace
150
151    rsp.requirements.resources[name].CopyFrom(selector)
DEFAULT_TTL = datetime.timedelta(seconds=60)
def to( req: crossplane.function.proto.v1.run_function_pb2.RunFunctionRequest, ttl: datetime.timedelta = datetime.timedelta(seconds=60)) -> crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse:
30def to(
31    req: fnv1.RunFunctionRequest,
32    ttl: datetime.timedelta = DEFAULT_TTL,
33) -> fnv1.RunFunctionResponse:
34    """Create a response to the supplied request.
35
36    Args:
37        req: The request to respond to.
38        ttl: How long Crossplane may optionally cache the response.
39
40    Returns:
41        A response to the supplied request.
42
43    The request's tag, desired resources, and context is automatically copied to
44    the response. Using response.to is a good pattern to ensure
45    """
46    dttl = durationpb.Duration()
47    dttl.FromTimedelta(ttl)
48    return fnv1.RunFunctionResponse(
49        meta=fnv1.ResponseMeta(tag=req.meta.tag, ttl=dttl),
50        desired=req.desired,
51        context=req.context,
52    )

Create a response to the supplied request.

Arguments:
  • req: The request to respond to.
  • ttl: How long Crossplane may optionally cache the response.
Returns:

A response to the supplied request.

The request's tag, desired resources, and context is automatically copied to the response. Using response.to is a good pattern to ensure

def normal( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, message: str) -> None:
55def normal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
56    """Add a normal result to the response."""
57    rsp.results.append(
58        fnv1.Result(
59            severity=fnv1.SEVERITY_NORMAL,
60            message=message,
61        )
62    )

Add a normal result to the response.

def warning( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, message: str) -> None:
65def warning(rsp: fnv1.RunFunctionResponse, message: str) -> None:
66    """Add a warning result to the response."""
67    rsp.results.append(
68        fnv1.Result(
69            severity=fnv1.SEVERITY_WARNING,
70            message=message,
71        )
72    )

Add a warning result to the response.

def fatal( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, message: str) -> None:
75def fatal(rsp: fnv1.RunFunctionResponse, message: str) -> None:
76    """Add a fatal result to the response."""
77    rsp.results.append(
78        fnv1.Result(
79            severity=fnv1.SEVERITY_FATAL,
80            message=message,
81        )
82    )

Add a fatal result to the response.

def set_output( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, output: dict | google.protobuf.struct_pb2.Struct) -> None:
 85def set_output(rsp: fnv1.RunFunctionResponse, output: dict | structpb.Struct) -> None:
 86    """Set the output field in a RunFunctionResponse for operation functions.
 87
 88    Args:
 89        rsp: The RunFunctionResponse to update.
 90        output: The output data as a dictionary or protobuf Struct.
 91
 92    Operation functions can return arbitrary output data that will be written
 93    to the Operation's status.pipeline field. This function sets that output
 94    on the response.
 95    """
 96    match output:
 97        case dict():
 98            rsp.output.CopyFrom(resource.dict_to_struct(output))
 99        case structpb.Struct():
100            rsp.output.CopyFrom(output)
101        case _:
102            t = type(output)
103            msg = f"Unsupported output type: {t}"
104            raise TypeError(msg)

Set the output field in a RunFunctionResponse for operation functions.

Arguments:
  • rsp: The RunFunctionResponse to update.
  • output: The output data as a dictionary or protobuf Struct.

Operation functions can return arbitrary output data that will be written to the Operation's status.pipeline field. This function sets that output on the response.

def require_resources( rsp: crossplane.function.proto.v1.run_function_pb2.RunFunctionResponse, name: str, api_version: str, kind: str, *, match_name: str | None = None, match_labels: dict[str, str] | None = None, namespace: str | None = None) -> None:
107def require_resources(  # noqa: PLR0913
108    rsp: fnv1.RunFunctionResponse,
109    name: str,
110    api_version: str,
111    kind: str,
112    *,
113    match_name: str | None = None,
114    match_labels: dict[str, str] | None = None,
115    namespace: str | None = None,
116) -> None:
117    """Add a resource requirement to the response.
118
119    Args:
120        rsp: The RunFunctionResponse to update.
121        name: The name to use for this requirement.
122        api_version: The API version of resources to require.
123        kind: The kind of resources to require.
124        match_name: Match a resource by name (mutually exclusive with match_labels).
125        match_labels: Match resources by labels (mutually exclusive with match_name).
126        namespace: The namespace to search in (optional).
127
128    Raises:
129        ValueError: If both match_name and match_labels are provided, or neither.
130
131    This tells Crossplane to fetch the specified resources and include them
132    in the next call to the function in req.required_resources[name].
133    """
134    if (match_name is None) == (match_labels is None):
135        msg = "Exactly one of match_name or match_labels must be provided"
136        raise ValueError(msg)
137
138    selector = fnv1.ResourceSelector(
139        api_version=api_version,
140        kind=kind,
141    )
142
143    if match_name is not None:
144        selector.match_name = match_name
145
146    if match_labels is not None:
147        selector.match_labels.labels.update(match_labels)
148
149    if namespace is not None:
150        selector.namespace = namespace
151
152    rsp.requirements.resources[name].CopyFrom(selector)

Add a resource requirement to the response.

Arguments:
  • rsp: The RunFunctionResponse to update.
  • name: The name to use for this requirement.
  • api_version: The API version of resources to require.
  • kind: The kind of resources to require.
  • match_name: Match a resource by name (mutually exclusive with match_labels).
  • match_labels: Match resources by labels (mutually exclusive with match_name).
  • namespace: The namespace to search in (optional).
Raises:
  • ValueError: If both match_name and match_labels are provided, or neither.

This tells Crossplane to fetch the specified resources and include them in the next call to the function in req.required_resources[name].