-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add Cloud Events support to Spring Integration #10448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,7 @@ ext { | |
avroVersion = '1.12.0' | ||
awaitilityVersion = '4.3.0' | ||
camelVersion = '4.14.1' | ||
cloudEventsVersion = '4.0.1' | ||
commonsDbcp2Version = '2.13.0' | ||
commonsIoVersion = '2.20.0' | ||
commonsNetVersion = '3.12.0' | ||
|
@@ -474,6 +475,23 @@ project('spring-integration-cassandra') { | |
} | ||
} | ||
|
||
project('spring-integration-cloudevents') { | ||
description = 'Spring Integration CloudEvents Support' | ||
|
||
dependencies { | ||
api "io.cloudevents:cloudevents-core:$cloudEventsVersion" | ||
optionalApi "io.cloudevents:cloudevents-spring:$cloudEventsVersion" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought we have discussed that no this dependency on our side to avoid cycle conflict. |
||
optionalApi "io.cloudevents:cloudevents-json-jackson:$cloudEventsVersion" | ||
|
||
optionalApi("io.cloudevents:cloudevents-avro-compact:$cloudEventsVersion") { | ||
exclude group: 'org.apache.avro', module: 'avro' | ||
} | ||
optionalApi "org.apache.avro:avro:$avroVersion" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we ca rely on whatever their module give us. |
||
optionalApi "io.cloudevents:cloudevents-xml:$cloudEventsVersion" | ||
testImplementation 'tools.jackson.core:jackson-databind' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. Artifact from a previous test. |
||
} | ||
} | ||
|
||
project('spring-integration-core') { | ||
description = 'Spring Integration Core' | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
|
||
/* | ||
* Copyright 2025-present the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
@org.jspecify.annotations.NullMarked | ||
package org.springframework.integration.cloudevents; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,288 @@ | ||
/* | ||
* Copyright 2025-present the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.cloudevents.transformer; | ||
|
||
import java.net.URI; | ||
import java.time.OffsetDateTime; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.function.Function; | ||
|
||
import io.cloudevents.CloudEvent; | ||
import io.cloudevents.CloudEventExtension; | ||
import io.cloudevents.CloudEventExtensions; | ||
import io.cloudevents.core.builder.CloudEventBuilder; | ||
import io.cloudevents.core.format.EventFormat; | ||
import io.cloudevents.core.provider.EventFormatProvider; | ||
import org.jspecify.annotations.Nullable; | ||
|
||
import org.springframework.context.ApplicationContext; | ||
import org.springframework.expression.EvaluationContext; | ||
import org.springframework.expression.Expression; | ||
import org.springframework.expression.common.LiteralExpression; | ||
import org.springframework.integration.expression.ExpressionUtils; | ||
import org.springframework.integration.expression.FunctionExpression; | ||
import org.springframework.integration.transformer.AbstractTransformer; | ||
import org.springframework.integration.transformer.MessageTransformationException; | ||
import org.springframework.messaging.Message; | ||
import org.springframework.messaging.MessageHeaders; | ||
import org.springframework.messaging.support.MessageBuilder; | ||
import org.springframework.util.StringUtils; | ||
|
||
/** | ||
* A Spring Integration transformer that converts messages to CloudEvent format. | ||
* Attribute and extension mapping is performed based on {@link Expression}s. | ||
* | ||
* @author Glenn Renfro | ||
* | ||
* @since 7.0 | ||
*/ | ||
public class ToCloudEventTransformer extends AbstractTransformer { | ||
|
||
private Expression idExpression = new FunctionExpression<Message<?>>( | ||
msg -> Objects.requireNonNull(msg.getHeaders().getId()).toString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a critical path, so no extra checks, please, |
||
|
||
@SuppressWarnings("NullAway.Init") | ||
private Expression sourceExpression; | ||
|
||
private Expression typeExpression = new LiteralExpression("spring.message"); | ||
|
||
@SuppressWarnings("NullAway.Init") | ||
private Expression dataSchemaExpression; | ||
|
||
private Expression subjectExpression = new FunctionExpression<>((Function<Message<?>, @Nullable String>) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this cannot be |
||
message -> null); | ||
|
||
private final Expression @Nullable [] cloudEventExtensionExpressions; | ||
|
||
@SuppressWarnings("NullAway.Init") | ||
private EvaluationContext evaluationContext; | ||
|
||
private final EventFormatProvider eventFormatProvider = EventFormatProvider.getInstance(); | ||
|
||
/** | ||
* Construct a ToCloudEventTransformer. | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No blank lines in the method Javadocs. |
||
* @param cloudEventExtensionExpressions an array of {@link Expression}s for establishing CloudEvent extensions | ||
*/ | ||
public ToCloudEventTransformer(Expression @Nullable ... cloudEventExtensionExpressions) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is what we have discussed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought it was going to be expressions so that extensions could be pulled from the headers or the payload. But I could be wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I see your point. Patterns just against the headers is not enough to cover all the possible use-cases. |
||
this.cloudEventExtensionExpressions = cloudEventExtensionExpressions; | ||
} | ||
|
||
/** | ||
* Construct a ToCloudEventTransformer with no {@link Expression}s for extensions. | ||
* | ||
*/ | ||
public ToCloudEventTransformer() { | ||
this((Expression[]) null); | ||
} | ||
|
||
/** | ||
* Set the {@link Expression} for creating CloudEvent ids. | ||
* Default expression extracts the id from the {@link MessageHeaders} of the message. | ||
* | ||
* @param idExpression the expression used to create the id for each CloudEvent | ||
*/ | ||
public void setIdExpression(Expression idExpression) { | ||
this.idExpression = idExpression; | ||
} | ||
|
||
/** | ||
* Set the {@link Expression} for creating CloudEvent source. | ||
* Default expression is {@code "/spring/" + appName + "." + getBeanName())}. | ||
* | ||
* @param sourceExpression the expression used to create the source for each CloudEvent | ||
*/ | ||
public void setSourceExpression(Expression sourceExpression) { | ||
this.sourceExpression = sourceExpression; | ||
} | ||
|
||
/** | ||
* Set the {@link Expression} for extracting the type for the CloudEvent. | ||
* Default expression sets the default to "spring.message". | ||
* | ||
* @param typeExpression the expression used to create the type for each CloudEvent | ||
*/ | ||
public void setTypeExpression(Expression typeExpression) { | ||
this.typeExpression = typeExpression; | ||
} | ||
|
||
/** | ||
* Set the {@link Expression} for creating the dataSchema for the CloudEvent. | ||
* Default {@link Expression} evaluates to a null. | ||
* | ||
* @param dataSchemaExpression the expression used to create the dataSchema for each CloudEvent | ||
*/ | ||
public void setDataSchemaExpression(Expression dataSchemaExpression) { | ||
this.dataSchemaExpression = dataSchemaExpression; | ||
} | ||
|
||
/** | ||
* Set the {@link Expression} for creating the subject for the CloudEvent. | ||
* Default {@link Expression} evaluates to a null. | ||
* | ||
* @param subjectExpression the expression used to create the subject for each CloudEvent | ||
*/ | ||
public void setSubjectExpression(Expression subjectExpression) { | ||
this.subjectExpression = subjectExpression; | ||
} | ||
|
||
@Override | ||
protected void onInit() { | ||
super.onInit(); | ||
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); | ||
ApplicationContext applicationContext = getApplicationContext(); | ||
if (this.sourceExpression == null) { // in the case the user sets the value prior to onInit. | ||
this.sourceExpression = new FunctionExpression<>((Function<Message<?>, URI>) message -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It needs to be a URI. I could use SpEL There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. URI. Then see |
||
String appName = applicationContext.getEnvironment().getProperty("spring.application.name"); | ||
appName = appName == null ? "unknown" : appName; | ||
return URI.create("/spring/" + appName + "." + getBeanName()); | ||
}); | ||
} | ||
if (this.dataSchemaExpression == null) { // in the case the user sets the value prior to onInit. | ||
this.dataSchemaExpression = new FunctionExpression<>((Function<Message<?>, @Nullable URI>) | ||
message -> null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why cannot we deal with just null for this |
||
} | ||
} | ||
|
||
/** | ||
* Transform the input message into a CloudEvent message. | ||
* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something is off with your IDE formatting: no blank lines in method Javadocs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IDE setting has been updated. Correcting docs. |
||
* @param message the input Spring Integration message to transform | ||
* @return CloudEvent message in the specified format | ||
* @throws RuntimeException if serialization fails | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
@Override | ||
protected Object doTransform(Message<?> message) { | ||
|
||
String id = this.idExpression.getValue(this.evaluationContext, message, String.class); | ||
if (!StringUtils.hasText(id)) { | ||
throw new MessageTransformationException(message, "No id was found with the specified expression"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, just plain There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will let the builder do the work for testing for null and throwing an exception. No need to do it twice, since performance is critical |
||
} | ||
|
||
URI source = this.sourceExpression.getValue(this.evaluationContext, message, URI.class); | ||
if (source == null) { | ||
throw new MessageTransformationException(message, "No source was found with the specified expression"); | ||
} | ||
|
||
String type = this.typeExpression.getValue(this.evaluationContext, message, String.class); | ||
if (type == null) { | ||
throw new MessageTransformationException(message, "No type was found with the specified expression"); | ||
} | ||
|
||
String contentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE, String.class); | ||
if (contentType == null) { | ||
throw new MessageTransformationException(message, "Missing 'Content-Type' header"); | ||
} | ||
|
||
EventFormat eventFormat = this.eventFormatProvider.resolveFormat(contentType); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we still need to have a way to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite clear on this request. Let's discuss. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me remind our discussion about |
||
if (eventFormat == null) { | ||
throw new MessageTransformationException("No EventFormat found for '" + contentType + "'"); | ||
} | ||
|
||
ToCloudEventTransformerExtensions extensions = | ||
new ToCloudEventTransformerExtensions(this.evaluationContext, (Message<byte[]>) message, | ||
this.cloudEventExtensionExpressions); | ||
|
||
CloudEvent cloudEvent = CloudEventBuilder.v1() | ||
.withId(id) | ||
.withSource(source) | ||
.withType(type) | ||
.withTime(OffsetDateTime.now()) | ||
.withDataContentType(contentType) | ||
.withDataSchema(this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class)) | ||
.withSubject(this.subjectExpression.getValue(this.evaluationContext, message, String.class)) | ||
.withData(getPayload(message)) | ||
.withExtension(extensions) | ||
.build(); | ||
|
||
return MessageBuilder.withPayload(eventFormat.serialize(cloudEvent)) | ||
.copyHeaders(message.getHeaders()) | ||
.build(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missed a |
||
} | ||
|
||
@Override | ||
public String getComponentType() { | ||
return "ce:to-cloudevents-transformer"; | ||
} | ||
|
||
private byte[] getPayload(Message<?> message) { | ||
if (message.getPayload() instanceof byte[] messagePayload) { | ||
return messagePayload; | ||
} | ||
throw new MessageTransformationException("Message payload is not a byte array"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simple |
||
} | ||
|
||
private static class ToCloudEventTransformerExtensions implements CloudEventExtension { | ||
|
||
/** | ||
* Map storing the CloudEvent extensions extracted from message headers. | ||
*/ | ||
private final Map<String, Object> cloudEventExtensions; | ||
|
||
/** | ||
* Construct CloudEvent extensions by processing a message using expressions. | ||
* | ||
* @param message the Spring Integration message | ||
* @param expressions an array of {@link Expression}s where each accepts a message and returns a | ||
* {@code Map<String, Object>} of extensions | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
ToCloudEventTransformerExtensions(EvaluationContext evaluationContext, Message<byte[]> message, | ||
Expression @Nullable ... expressions) { | ||
this.cloudEventExtensions = new HashMap<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to have a blank line in the method body if header is multi-line. |
||
if (expressions == null) { | ||
return; | ||
} | ||
for (Expression expression : expressions) { | ||
Map<String, Object> result = (Map<String, Object>) expression.getValue(evaluationContext, message, | ||
Map.class); | ||
if (result == null) { | ||
continue; | ||
} | ||
for (String key : result.keySet()) { | ||
this.cloudEventExtensions.put(key, result.get(key)); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void readFrom(CloudEventExtensions extensions) { | ||
extensions.getExtensionNames() | ||
.forEach(key -> { | ||
Object value = extensions.getExtension(key); | ||
if (value != null) { | ||
this.cloudEventExtensions.put(key, value); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public @Nullable Object getValue(String key) throws IllegalArgumentException { | ||
return this.cloudEventExtensions.get(key); | ||
} | ||
|
||
@Override | ||
public Set<String> getKeys() { | ||
return this.cloudEventExtensions.keySet(); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
|
||
@org.jspecify.annotations.NullMarked | ||
package org.springframework.integration.cloudevents.transformer; |
Uh oh!
There was an error while loading. Please reload this page.