-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add Cloud Events support to Spring Integration #10448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
03d1f51
to
02a1329
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some at a glance review.
Thank you!
...src/main/java/org/springframework/integration/cloudevents/v1/CloudEventMessageConverter.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/springframework/integration/cloudevents/v1/CloudEventMessageConverter.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/springframework/integration/cloudevents/v1/CloudEventMessageConverter.java
Outdated
Show resolved
Hide resolved
...g/springframework/integration/cloudevents/v1/transformer/CloudEventMessageConverterTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for a lengthy review and some doubts I've expressed.
I addition, what are your thoughts about content of this package in Spring Cloud Function: https://github.com/spring-cloud/spring-cloud-function/tree/main/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/cloudevent ?
I mean transformer does the trick indeed, but only from an integration flow context.
How about the way to be able to construct CloudEvent
programmatically?
Or just existing SDK API is enough to deal with?
return CloudEventUtils.toReader((CloudEvent) payload).read(new MessageBuilderMessageWriter(headers, this.cePrefix)); | ||
} | ||
|
||
private MessageReader createMessageReader(Message<?> message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like all of methods starting with this has to be static
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has not been addressed yet.
...ation/cloudevents/transformer/strategies/cloudeventconverter/CloudEventMessageConverter.java
Outdated
Show resolved
Hide resolved
...ation/cloudevents/transformer/strategies/cloudeventconverter/MessageBinaryMessageReader.java
Outdated
Show resolved
Hide resolved
...pringframework/integration/cloudevents/v1/transformer/ToCloudEventTransformerExtensions.java
Outdated
Show resolved
Hide resolved
...a/org/springframework/integration/cloudevents/v1/transformer/utils/HeaderPatternMatcher.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/cloudevents/v1/transformer/ToCloudEventTransformer.java
Outdated
Show resolved
Hide resolved
...java/org/springframework/integration/cloudevents/v1/transformer/ToCloudEventTransformer.java
Outdated
Show resolved
Hide resolved
.../main/java/org/springframework/integration/cloudevents/transformer/CloudEventProperties.java
Outdated
Show resolved
Hide resolved
...loudevents/src/main/java/org/springframework/integration/cloudevents/CloudEventsHeaders.java
Outdated
Show resolved
Hide resolved
src/reference/antora/modules/ROOT/pages/cloudevents/cloudevents-transform.adoc
Outdated
Show resolved
Hide resolved
cabc5e5
to
d14c5f8
Compare
return CloudEventUtils.toReader((CloudEvent) payload).read(new MessageBuilderMessageWriter(headers, this.cePrefix)); | ||
} | ||
|
||
private MessageReader createMessageReader(Message<?> message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has not been addressed yet.
...loudevents/src/main/java/org/springframework/integration/cloudevents/CloudEventsHeaders.java
Outdated
Show resolved
Hide resolved
...ts/src/main/java/org/springframework/integration/cloudevents/MessageBinaryMessageReader.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
protected boolean isContentTypeHeader(String key) { | ||
return org.springframework.messaging.MessageHeaders.CONTENT_TYPE.equalsIgnoreCase(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need FQCN?
...s/src/main/java/org/springframework/integration/cloudevents/MessageBuilderMessageWriter.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/springframework/integration/cloudevents/MessageBuilderMessageWriter.java
Outdated
Show resolved
Hide resolved
...g/springframework/integration/cloudevents/transformer/ToCloudEventTransformerExtensions.java
Outdated
Show resolved
Hide resolved
...g/springframework/integration/cloudevents/transformer/ToCloudEventTransformerExtensions.java
Outdated
Show resolved
Hide resolved
.../java/org/springframework/integration/cloudevents/transformer/strategies/FormatStrategy.java
Outdated
Show resolved
Hide resolved
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
class CloudEventPropertiesTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I said before: I'm not sure in the CloudEventProperties
class at all.
And our preferences is *Tests
for test class names.
Please, revise all of your new test classes.
For now hold off on a review. I'm writing up some questions for some guidance on some of the issues you raised in the last 2 reviews. |
.../java/org/springframework/integration/cloudevents/transformer/strategies/FormatStrategy.java
Outdated
Show resolved
Hide resolved
...s/src/main/java/org/springframework/integration/cloudevents/MessageBuilderMessageWriter.java
Outdated
Show resolved
Hide resolved
...loudevents/src/main/java/org/springframework/integration/cloudevents/CloudEventsHeaders.java
Outdated
Show resolved
Hide resolved
d14c5f8
to
d8989f9
Compare
Introduces Cloud Events v1.0 specification support including message converters, transformers, and utilities. Key components added: - CloudEventMessageConverter for message format conversion - ToCloudEventTransformer for transforming messages to Cloud Events - MessageBinaryMessageReader/Writer for binary format handling - CloudEventProperties for configuration management - Header pattern matching utilities for flexible event mapping - Add reference docs and what's-new paragraph
Remove v1 subpackage and flatten the CloudEvents package hierarchy. Introduce strategy pattern for format conversion to replace enum-based approach, improving extensibility and reduce dependencies. Key changes: - Move all classes from cloudevents.v1 to cloudevents base package - Remove optional format dependencies (JSON, XML, Avro) from build - Replace `ConversionType` enum with `FormatStrategy` interface - Add `CloudEventMessageFormatStrategy` as default implementation - Inline `HeaderPatternMatcher` logic into `ToCloudEventTransformerExtensions` - Add `@NullMarked` package annotations and `@Nullable` throughout - Document `targetClass` parameter behavior in `CloudEventMessageConverter` - Split transformer tests for better organization and coverage - Update component type identifier to "ce:to-cloudevents-transformer" - Remove unnecessary docs from package-info
- Simplify the CloudEvent transformer by consolidating configuration directly into ToCloudEventTransformer class rather than using separate configuration objects - Remove CloudEventProperties and ToCloudEventTransformerExtensions classes to reduce abstraction layers and improve maintainability - Make MessageBinaryMessageReader package-private and convert CloudEventMessageConverter methods to static where possible - Move extension filtering logic into a private inner class within the transformer - Remove CloudEventsHeaders class and CE_PREFIX constant as the prefix is no longer used as a configurable value
Replace custom CloudEvent converter infrastructure with direct CloudEvents SDK format implementations. Key changes: - Replace `FormatStrategy` pattern-based approach with direct `EventFormatProvider` integration from CloudEvents SDK - Remove custom converter classes (`CloudEventMessageConverter`, `MessageBinaryMessageReader`, `MessageBuilderMessageWriter`) - Simplify transformer to use Expression-based configuration for all CloudEvent attributes (id, source, type, dataSchema, subject) - Add validation for required CloudEvent attributes with clear error messages when expressions evaluate to null or empty values - Update documentation to reflect Expression-based API and byte[] payload requirement - Consolidate tests by removing coverage for deleted converter infrastructure
d8989f9
to
be0fa17
Compare
|
||
dependencies { | ||
api "io.cloudevents:cloudevents-core:$cloudEventsVersion" | ||
optionalApi "io.cloudevents:cloudevents-spring:$cloudEventsVersion" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we have discussed that no this dependency on our side to avoid cycle conflict.
optionalApi("io.cloudevents:cloudevents-avro-compact:$cloudEventsVersion") { | ||
exclude group: 'org.apache.avro', module: 'avro' | ||
} | ||
optionalApi "org.apache.avro:avro:$avroVersion" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we ca rely on whatever their module give us.
More over, I don't see a reason to have formats as compile dependencies at all.
Just testImplementation
should be enough for our use-case.
We really must not have any usage of their format implementations in our prod code.
} | ||
optionalApi "org.apache.avro:avro:$avroVersion" | ||
optionalApi "io.cloudevents:cloudevents-xml:$cloudEventsVersion" | ||
testImplementation 'tools.jackson.core:jackson-databind' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
I don't believe that cloudevents-json-jackson
can work with Jackson 3.
So, just rely on whatever is their transitive dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Artifact from a previous test.
public class ToCloudEventTransformer extends AbstractTransformer { | ||
|
||
private Expression idExpression = new FunctionExpression<Message<?>>( | ||
msg -> Objects.requireNonNull(msg.getHeaders().getId()).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a critical path, so no extra checks, please,
Has to be NullAway
.
See similar structure in the DefaultFileNameGenerator
.
And, please, rename this to the eventIdExpression
to be more specific.
Just id
does not give us a proper clue what is going on.
@SuppressWarnings("NullAway.Init") | ||
private Expression dataSchemaExpression; | ||
|
||
private Expression subjectExpression = new FunctionExpression<>((Function<Message<?>, @Nullable String>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this cannot be null
?
Why do we need a Function
overhead just to return null
?
@SuppressWarnings("unchecked") | ||
ToCloudEventTransformerExtensions(EvaluationContext evaluationContext, Message<byte[]> message, | ||
Expression @Nullable ... expressions) { | ||
this.cloudEventExtensions = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have a blank line in the method body if header is multi-line.
Much easier to read the code.
import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
||
@SpringJUnitConfig | ||
class ToCloudEventTransformerTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have a @DirtiesContext` on every single Spring-based test.
This way the memory will be freed from not used application contexts.
** xref:amqp/amqp-1.0.adoc[] | ||
* xref:camel.adoc[] | ||
* xref:cassandra.adoc[] | ||
* xref:cloudevents-transform.adoc[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fact that it always be only for transformers.
Please, rename the file to cloudevents
.
@@ -0,0 +1,133 @@ | |||
[[cloudevents-transformer]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DITTO.
And, please, add a general section about CloudEvents and respective dependency management.
See cassandra.adoc
for example.
Plus, it would be great to mention those format dependencies to be includes.
[[x7.0-cloudevents]] | ||
=== CloudEvents | ||
|
||
The CloudEvent transformer converts Spring Integration messages into CloudEvent compliant messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we say as a general sentence that CloudEvents are now supported?
And mention module name as well.
And probably no need to go into details in this what's new about the transformer.
That would be obvious in the target chapter.
The point is, that we also want to have a FromCloudEventTransformer
, for those cases when we receive them into our Spring Integration application.
[[cloudevent-transformer-overview]] | ||
=== Overview | ||
|
||
The CloudEvent transformer (`ToCloudEventTransformer`) extends Spring Integration's `AbstractTransformer` to convert messages to CloudEvent format. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to say that AbstractTransformer
is a Spring Integration class.
We are already here.
=== Overview | ||
|
||
The CloudEvent transformer (`ToCloudEventTransformer`) extends Spring Integration's `AbstractTransformer` to convert messages to CloudEvent format. | ||
The CloudEvent transformer identifies the `EventFormat` classes in the classpath and utilizes registers these as the available serializers for CloudEvents. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not quit true. Formats are detected by CloudEvents Provider
, we just use its API to resolve against a content-type
.
The `ToCloudEventTransformer` allows the user to use SpEL `Expression`s to populate the attributes as well as the extensions. | ||
|
||
==== Attribute Expressions | ||
As discussed above users are allowed to set the `id`, `source`, `type`, `dataSchema`, `subject` through SpEL `Expression`s. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quit true, since this is the first time in this doc we encounter specific attributes.
Plus, blank line after the section title.
---- | ||
ExpressionParser parser = new SpelExpressionParser(); | ||
ToCloudEventTransformer transformer = new ToCloudEventTransformer(null); | ||
transformer.setTypeExpression(parser.parseExpression("sampleType")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very working example.
With a parsed SpEL it would try to do something like Message.getSampleType()
which is an error.
Looks like you are talking really about a new LiteralExpression("sampleType")
---- | ||
|
||
==== Extension Expressions | ||
The expressions constructor parameter is an array of `Expression`s. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again: blank line after the title.
And probably this section would be changed if we come back to patterns for headers to map into extensions.
Unless you can convince me why we have to go with expressions for them.
Each `Expression` in the array must return the type Map<String, Object>. | ||
Where the key is a string and the value is of type object. | ||
In the example below the extensions are hard coded to return 3 `Map<String, Object>` objects each containing one extension. | ||
[source,java] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blank line before code block: much easier to read the doc.
| the id of the message. | ||
|
||
| `source` | ||
| Prefix of "/spring/" followed by the appName a `.` then the name of the transformer's bean. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something is off with the part about a .
// Input message with headers | ||
Message<String> inputMessage = MessageBuilder | ||
.withPayload("Hello CloudEvents") | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to your current code in the transformer this example is not correct since content-type
is required.
I guess we can go with the application/octet-stream
by default.
return byteArrayOutputStream.toByteArray(); | ||
} | ||
|
||
private MessageBuilder<byte[]> createBaseMessage(byte[] payload, String contentType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See if many of these supporting method could be static
.
Much easier to navigate the code.
|
||
@Bean | ||
public ToCloudEventTransformer transformerWithInvalidIDExpression() { | ||
ExpressionParser parser = new SpelExpressionParser(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parser could go into a static
property. It is fully thread-safe.
Introduces Cloud Events v1.0 specification support including message converters, transformers, and utilities.
Key components added: