Skip to content

Commit

Permalink
Merge pull request #8 from ZenWave360/develop
Browse files Browse the repository at this point in the history
apply spring-java-format
  • Loading branch information
ivangsa authored Dec 28, 2024
2 parents 6d85a76 + 0910e3a commit be51d5d
Show file tree
Hide file tree
Showing 19 changed files with 1,837 additions and 1,658 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package io.zenwave360.modulith.events.scs;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.springframework.modulith.events.core.EventSerializer;

import java.util.Map;
Expand All @@ -30,4 +26,5 @@ protected Map<String, Object> serializeToMap(Object payload) {
objectNode.remove("specificData"); // TODO: remove this recursively
return avroMapper.convertValue(objectNode, Map.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Map;

public class MessageEventSerializer implements EventSerializer {

private final ObjectMapper jacksonMapper;

public MessageEventSerializer(ObjectMapper jacksonMapper) {
Expand Down Expand Up @@ -45,12 +46,14 @@ public Object serialize(Object event) {
public <T> T deserialize(Object serialized, Class<T> type) {
try {
return unsafeDeserialize(serialized, type);
} catch (JsonProcessingException | ClassNotFoundException e) {
}
catch (JsonProcessingException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

private <T> T unsafeDeserialize(Object serialized, Class<T> type) throws JsonProcessingException, ClassNotFoundException {
private <T> T unsafeDeserialize(Object serialized, Class<T> type)
throws JsonProcessingException, ClassNotFoundException {
if (Message.class.isAssignableFrom(type)) {
JsonNode node = jacksonMapper.readTree(serialized.toString());
JsonNode headersNode = node.get("headers");
Expand All @@ -63,7 +66,8 @@ private <T> T unsafeDeserialize(Object serialized, Class<T> type) throws JsonPro
objectNode.remove("_class");
}
payload = deserializePayload(payloadNode, payloadType);
} else {
}
else {
payload = deserializePayload(payloadNode, Object.class);
}
return (T) MessageBuilder.createMessage(payload, new MessageHeaders(headers));
Expand All @@ -79,7 +83,8 @@ protected Object jacksonSerialize(Object event) {
try {
var map = serializeToMap(event);
return jacksonMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -88,8 +93,10 @@ protected <T> T jacksonDeserialize(Object serialized, Class<T> type) {
try {
JsonNode node = jacksonMapper.readTree(serialized.toString());
return (T) jacksonMapper.readerFor(type).readValue(node);
} catch (IOException e) {
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ public class SpringCloudStreamEventExternalizer implements BiFunction<RoutingTar
public static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";

private final EventExternalizationConfiguration configuration;

private final StreamBridge streamBridge;

private final BindingServiceProperties bindingServiceProperties;

private final BinderFactory binderFactory;

private final EvaluationContext context;

public SpringCloudStreamEventExternalizer(EventExternalizationConfiguration configuration, EvaluationContext context, StreamBridge streamBridge, BindingServiceProperties bindingServiceProperties, BinderFactory binderFactory) {
public SpringCloudStreamEventExternalizer(EventExternalizationConfiguration configuration,
EvaluationContext context, StreamBridge streamBridge, BindingServiceProperties bindingServiceProperties,
BinderFactory binderFactory) {
this.configuration = configuration;
this.context = context;
this.streamBridge = streamBridge;
Expand All @@ -46,17 +52,16 @@ public CompletableFuture<?> apply(RoutingTarget routingTarget, Object event) {
var keyHeaderValue = routing.getKey(event);
var keyHeaderName = getKeyHeaderName(target, bindingServiceProperties, binderFactory);

var headersMap = event instanceof Message ? new LinkedHashMap<>(((Message<?>) event).getHeaders()) : new LinkedHashMap<String, Object>();
var headersMap = event instanceof Message ? new LinkedHashMap<>(((Message<?>) event).getHeaders())
: new LinkedHashMap<String, Object>();
if (keyHeaderValue != null && keyHeaderName != null) {
if(!headersMap.containsKey(keyHeaderName)) {
if (!headersMap.containsKey(keyHeaderName)) {
log.debug("Adding key header to message: {} = {}", keyHeaderName, keyHeaderValue);
headersMap.put(keyHeaderName, keyHeaderValue);
}
}
var payload = event instanceof Message<?>? ((Message<?>) event).getPayload() : event;
var message = MessageBuilder.withPayload(payload)
.copyHeaders(headersMap)
.build();
var payload = event instanceof Message<?> ? ((Message<?>) event).getPayload() : event;
var message = MessageBuilder.withPayload(payload).copyHeaders(headersMap).build();

log.debug("Sending event to Spring Cloud Stream target: {}", target);
var result = streamBridge.send(target, message);
Expand All @@ -70,23 +75,22 @@ public CompletableFuture<?> apply(RoutingTarget routingTarget, Object event) {
"org.springframework.cloud.stream.binder.pubsub.PubSubMessageChannelBinder", "pubsub_orderingKey",
"org.springframework.cloud.stream.binder.eventhubs.EventHubsMessageChannelBinder", "partitionKey",
"org.springframework.cloud.stream.binder.solace.SolaceMessageChannelBinder", "solace_messageKey",
"org.springframework.cloud.stream.binder.pulsar.PulsarMessageChannelBinder", "pulsar_key"
);
"org.springframework.cloud.stream.binder.pulsar.PulsarMessageChannelBinder", "pulsar_key");

protected String getKeyHeaderName(String channelName, BindingServiceProperties bindingServiceProperties,
BinderFactory binderFactory) {
String binderConfigurationName = bindingServiceProperties.getBinder(channelName);
var binder = binderFactory.getBinder(binderConfigurationName, MessageChannel.class);
if(binder == null) {
if (binder == null) {
return null;
}
return messageKeyHeaders.get(binder.getClass().getName());
}

protected String getTarget(Object event, RoutingTarget routingTarget) {
if(event instanceof Message<?> message) {
if (event instanceof Message<?> message) {
var target = message.getHeaders().get(SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER, String.class);
return target != null? target : routingTarget.getTarget();
return target != null ? target : routingTarget.getTarget();
}
return routingTarget.getTarget();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import java.lang.annotation.Target;

@Configuration
@Import({ SpringCloudStreamEventExternalizerConfiguration.class, EventSerializerConfiguration.class})
@Import({ SpringCloudStreamEventExternalizerConfiguration.class, EventSerializerConfiguration.class })
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface EnableSpringCloudStreamEventExternalization {

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

@AutoConfiguration
@AutoConfigureAfter(EventExternalizationAutoConfiguration.class)
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled",
havingValue = "true",
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled", havingValue = "true",
matchIfMissing = true)
public class EventSerializerConfiguration {

Expand All @@ -34,4 +33,5 @@ public EventSerializer avroEventSerializer(ObjectMapper mapper) {
public EventSerializer messageEventSerializer(ObjectMapper mapper) {
return new MessageEventSerializer(mapper);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
import org.springframework.modulith.events.support.DelegatingEventExternalizer;

/**
* Auto-configuration to set up a {@link DelegatingEventExternalizer} to externalize events to Spring Cloud Stream.
* Auto-configuration to set up a {@link DelegatingEventExternalizer} to externalize
* events to Spring Cloud Stream.
*
* @author ivangsa
* @since 1.3
Expand All @@ -45,39 +46,39 @@
@AutoConfiguration
@AutoConfigureAfter(EventExternalizationAutoConfiguration.class)
@ConditionalOnClass(StreamBridge.class)
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled",
havingValue = "true",
matchIfMissing = true)
@ConditionalOnProperty(name = "spring.modulith.events.externalization.enabled", havingValue = "true",
matchIfMissing = true)
public class SpringCloudStreamEventExternalizerConfiguration {

private static final Logger log = LoggerFactory.getLogger(SpringCloudStreamEventExternalizerConfiguration.class);
private static final Logger log = LoggerFactory.getLogger(SpringCloudStreamEventExternalizerConfiguration.class);

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing()
.select(event -> event instanceof Message<?> && getTarget(event) != null)
.routeAll(event -> RoutingTarget.forTarget(getTarget(event)).withoutKey())
.build();
}
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing()
.select(event -> event instanceof Message<?> && getTarget(event) != null)
.routeAll(event -> RoutingTarget.forTarget(getTarget(event)).withoutKey())
.build();
}

private String getTarget(Object event) {
if(event instanceof Message<?> message) {
return message.getHeaders().get(SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER, String.class);
}
return null;
}
private String getTarget(Object event) {
if (event instanceof Message<?> message) {
return message.getHeaders()
.get(SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER, String.class);
}
return null;
}

@Bean
DelegatingEventExternalizer springCloudStreamMessageExternalizer(
EventExternalizationConfiguration configuration,
StreamBridge streamBridge, BeanFactory factory,
BindingServiceProperties bindingServiceProperties,
BinderFactory binderFactory) {
log.debug("Registering domain event externalization to Spring Cloud Stream…");
@Bean
DelegatingEventExternalizer springCloudStreamMessageExternalizer(EventExternalizationConfiguration configuration,
StreamBridge streamBridge, BeanFactory factory, BindingServiceProperties bindingServiceProperties,
BinderFactory binderFactory) {
log.debug("Registering domain event externalization to Spring Cloud Stream…");

var context = new StandardEvaluationContext();
context.setBeanResolver(new BeanFactoryResolver(factory));
var context = new StandardEvaluationContext();
context.setBeanResolver(new BeanFactoryResolver(factory));

return new DelegatingEventExternalizer(configuration, new SpringCloudStreamEventExternalizer(configuration,
context, streamBridge, bindingServiceProperties, binderFactory));
}

return new DelegatingEventExternalizer(configuration, new SpringCloudStreamEventExternalizer(configuration, context, streamBridge, bindingServiceProperties, binderFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

public class AvroEventSerializerTest {


private EventSerializer eventSerializer;

@BeforeEach
Expand All @@ -24,16 +23,14 @@ public void setUp() {
public void testSerializeMessage() {
var customerEvent = new CustomerEvent();
customerEvent.setName("John Doe");
Message<?> message = MessageBuilder
.withPayload(customerEvent)
.setHeader("headerKey", "headerValue")
.build();
Message<?> message = MessageBuilder.withPayload(customerEvent).setHeader("headerKey", "headerValue").build();

Object serialized = eventSerializer.serialize(message);

Assertions.assertTrue(serialized instanceof String);
Assertions.assertTrue(serialized.toString().contains("\"name\":\"John Doe\","));
Assertions.assertTrue(serialized.toString().contains("\"_class\":\"io.zenwave360.modulith.events.scs.dtos.avro.CustomerEvent\""));
Assertions.assertTrue(serialized.toString()
.contains("\"_class\":\"io.zenwave360.modulith.events.scs.dtos.avro.CustomerEvent\""));
}

@Test
Expand Down Expand Up @@ -74,10 +71,7 @@ public void testDeserializeMessage() throws JsonProcessingException, ClassNotFou
public void testSerializeDeserializeMessage() throws JsonProcessingException, ClassNotFoundException {
var customerEvent = new CustomerEvent();
customerEvent.setName("John Doe");
Message<?> message = MessageBuilder
.withPayload(customerEvent)
.setHeader("headerKey", "headerValue")
.build();
Message<?> message = MessageBuilder.withPayload(customerEvent).setHeader("headerKey", "headerValue").build();

Object serialized = eventSerializer.serialize(message);
Message<?> deserialized = eventSerializer.deserialize(serialized, Message.class);
Expand All @@ -96,4 +90,5 @@ public void testSerializeDeserializeObject() throws JsonProcessingException, Cla
Assertions.assertEquals(CustomerEvent.class, deserialized.getClass());
Assertions.assertEquals(customerEvent.getName(), ((CustomerEvent) deserialized).getName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public class MessageEventSerializerTest {

private ObjectMapper objectMapper;

private EventSerializer eventSerializer;

@BeforeEach
Expand All @@ -24,16 +25,14 @@ public void setUp() {
@Test
public void testSerializeMessage() {
var customerEvent = new CustomerEvent().withName("John Doe");
Message<?> message = MessageBuilder
.withPayload(customerEvent)
.setHeader("headerKey", "headerValue")
.build();
Message<?> message = MessageBuilder.withPayload(customerEvent).setHeader("headerKey", "headerValue").build();

Object serialized = eventSerializer.serialize(message);

Assertions.assertTrue(serialized instanceof String);
Assertions.assertTrue(serialized.toString().contains("\"payload\":{\"name\":\"John Doe\","));
Assertions.assertTrue(serialized.toString().contains("\"_class\":\"io.zenwave360.modulith.events.scs.dtos.json.CustomerEvent\""));
Assertions.assertTrue(serialized.toString()
.contains("\"_class\":\"io.zenwave360.modulith.events.scs.dtos.json.CustomerEvent\""));
}

@Test
Expand Down Expand Up @@ -73,10 +72,7 @@ public void testDeserializeMessage() throws JsonProcessingException, ClassNotFou
@Test
public void testSerializeDeserializeMessage() throws JsonProcessingException, ClassNotFoundException {
var customerEvent = new CustomerEvent().withName("John Doe");
Message<?> message = MessageBuilder
.withPayload(customerEvent)
.setHeader("headerKey", "headerValue")
.build();
Message<?> message = MessageBuilder.withPayload(customerEvent).setHeader("headerKey", "headerValue").build();

Object serialized = eventSerializer.serialize(message);
Message<?> deserialized = eventSerializer.deserialize(serialized, Message.class);
Expand All @@ -93,4 +89,5 @@ public void testSerializeDeserializeObject() throws JsonProcessingException, Cla

Assertions.assertEquals(CustomerEvent.class, deserialized.getClass());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ void testExternalizeAvroEvent() throws InterruptedException {
Thread.sleep(5000);
// TODO: Assert that the event was externalized
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ public class SCSJsonEventExternalizerTest {

@Test
void testExternalizeJsonEvent() throws InterruptedException {
var event = new io.zenwave360.modulith.events.scs.dtos.json.CustomerEvent()
.withName("John Doe");
var event = new io.zenwave360.modulith.events.scs.dtos.json.CustomerEvent().withName("John Doe");
customerEventsProducer.onCustomerEventJson(event);
// Wait for the event to be externalized
Thread.sleep(5000);
// TODO: Assert that the event was externalized
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,24 @@ public CustomerEventsProducer(ApplicationEventPublisher applicationEventPublishe

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void onCustomerEventJson(io.zenwave360.modulith.events.scs.dtos.json.CustomerEvent event) {
Message<io.zenwave360.modulith.events.scs.dtos.json.CustomerEvent> message = MessageBuilder.withPayload(event)
.setHeader(
SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER,
"customers-json-out-0") // <- target binding name
.build();
Message<io.zenwave360.modulith.events.scs.dtos.json.CustomerEvent> message = MessageBuilder
.withPayload(event)
.setHeader(SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER,
"customers-json-out-0") // <- target binding name
.build();
applicationEventPublisher.publishEvent(message);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void onCustomerEventAvro(io.zenwave360.modulith.events.scs.dtos.avro.CustomerEvent event) {
Message<io.zenwave360.modulith.events.scs.dtos.avro.CustomerEvent> message = MessageBuilder.withPayload(event)
.setHeader(
SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER,
"customers-avro-out-0") // <- target binding name
.build();
Message<io.zenwave360.modulith.events.scs.dtos.avro.CustomerEvent> message = MessageBuilder
.withPayload(event)
.setHeader(SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER,
"customers-avro-out-0") // <- target binding name
.build();
applicationEventPublisher.publishEvent(message);
}

}

}
Loading

0 comments on commit be51d5d

Please sign in to comment.