Skip to content

Commit

Permalink
EventMesh function connector runtime (#4858)
Browse files Browse the repository at this point in the history
* [ISSUE #4812] Set up Admin Endpoints v2 (#4813)

* Remove redundant overloaded methods

* Simplify write() result param

* Add writeJson(); Add PUT; Add JavaDoc

* Rename EventHttpHandler to EventMeshHttpHandler

* Correct server thread name

* Clean up messy & non-hierarchical overloading

* No need to set headers manually any more

* Set up v1&v2 endpoints

* Set up v1&v2 response dto

* Introduce fastjson2

* Fix fastjson2 "level too large : 2048" error caused by IPAddress

* Correct @ConfigField naming

* Return properties format json key

* Add format option to query string

* Introduce Result

* Reduce duplicate builder code

* Fix all checkstyle warnings in eventmesh-runtime

* Add known dependency

* [ISSUE #4814] Migrate from fastjson 1.2.83 to fastjson2 (#4819)

* [Enhancement] Migrate from fastjson 1.2.83 to fastjson2 #4814

* fix_dependencies_problem

* fix_check

* [ISSUE #4551] modify the logic of time-consumption statistics (#4822)

* init connector runtime v2

* [ISSUE #4804] Fix SubStreamHandler exception loop by closeOnError (#4807)

* Handle exception loop by closeOnError

* Lombok optimization

* some format optimization

* Avoid closing multiple times

* Remove redundant set null

* Revert "Avoid closing multiple times"

This reverts commit 774397f.

* Use synchronized latch to keep senderOnComplete called once

* Use boolean to prevent latch called by somebody else

* Remove the unique callee/caller close() of onCompleted()

* [ISSUE #4838] Deprecate unused `eventMesh.connector.plugin.type` etc. properties (#4839)

* Remove all references of `eventMesh.connector.plugin.type`

* Deprecate `eventMesh.connector.plugin.type` and sort properties

* Remove misconfigured & not-used `registerIntervalInMills`, `fetchRegistryAddrIntervalInMills`

* Remove 'defibus' related un-used usages

* Supplement #4809 for `null != object`

* [ISSUE #4832] Downgrade stale bot to v8 to resolve state cache reserving error (#4833)

* Revert stale bot to v8 to resolve state cache reserving error

* Reduce operations-per-run to default value to ease pressure

* Unify yaml to yml

* [ISSUE #4820] Bug fix EventHandler not return json (#4821)

* bug fix

* bug fix

* bug fix

* update runtime v2

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

* update connector runtime

---------

Co-authored-by: Pil0tXia <xiatian@apache.org>
Co-authored-by: Zaki <91261012+cnzakii@users.noreply.github.com>
Co-authored-by: Karson <karsontao@hotmail.com>
  • Loading branch information
4 people authored Apr 22, 2024
1 parent 02f6d44 commit 859ad8d
Show file tree
Hide file tree
Showing 147 changed files with 2,559 additions and 926 deletions.
File renamed without changes.
3 changes: 2 additions & 1 deletion .github/workflows/stale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v9
- uses: actions/stale@v8
with:
days-before-issue-stale: 90
days-before-pr-stale: 60
Expand All @@ -47,3 +47,4 @@ jobs:
exempt-issue-labels: 'pinned,discussion,help wanted,WIP,weopen-star,GLCC,summer of code'
exempt-pr-labels: 'help wanted,dependencies'
exempt-all-milestones: true # Exempt all issues/PRs with milestones from stale
operations-per-run: 30
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ subprojects {
dependency "org.projectlombok:lombok:1.18.22"
dependency "com.github.seancfoley:ipaddress:5.3.3"
dependency "javax.annotation:javax.annotation-api:1.3.2"
dependency "com.alibaba:fastjson:1.2.83"
dependency "com.alibaba.fastjson2:fastjson2:2.0.48"

dependency "software.amazon.awssdk:s3:2.20.29"
dependency "com.github.rholder:guava-retrying:2.0.0"
Expand Down
2 changes: 2 additions & 0 deletions eventmesh-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ dependencies {
api "io.cloudevents:cloudevents-core"
api "io.cloudevents:cloudevents-json-jackson"

api "com.alibaba.fastjson2:fastjson2"

implementation "org.apache.logging.log4j:log4j-api"
implementation "org.apache.logging.log4j:log4j-core"
implementation "org.apache.logging.log4j:log4j-slf4j2-impl"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public EventMeshThreadFactory(final String threadNamePrefix) {
public Thread newThread(@Nonnull final Runnable runnable) {

StringBuilder threadName = new StringBuilder(threadNamePrefix);
if (null != threadIndex) {
if (threadIndex != null) {
threadName.append("-").append(threadIndex.incrementAndGet());
}
Thread thread = new Thread(runnable, threadName.toString());
thread.setDaemon(daemon);
if (null != priority) {
if (priority != null) {
thread.setPriority(priority);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,94 +34,85 @@
@Config(prefix = "eventMesh")
public class CommonConfiguration {

@ConfigFiled(field = "sysid", beNumber = true, notEmpty = true)
@ConfigField(field = "sysid", beNumber = true, notEmpty = true)
private String sysID = "5477";

@ConfigFiled(field = "server.env", notEmpty = true)
@ConfigField(field = "server.env", notEmpty = true)
private String eventMeshEnv = "P";

@ConfigFiled(field = "server.idc", notEmpty = true)
@ConfigField(field = "server.idc", notEmpty = true)
private String eventMeshIDC = "FT";

@ConfigFiled(field = "server.name", notEmpty = true)
@ConfigField(field = "server.name", notEmpty = true)
private String eventMeshName = "";

@ConfigFiled(field = "server.cluster", notEmpty = true)
@ConfigField(field = "server.cluster", notEmpty = true)
private String eventMeshCluster = "LS";

@ConfigFiled(field = "server.hostIp", reload = true)
@ConfigField(field = "server.hostIp", reload = true)
private String eventMeshServerIp = null;

@ConfigFiled(field = "metaStorage.plugin.server-addr", notEmpty = true)
@ConfigField(field = "metaStorage.plugin.server-addr", notEmpty = true)
private String metaStorageAddr = "";

@ConfigFiled(field = "metaStorage.plugin.type", notEmpty = true)
@ConfigField(field = "metaStorage.plugin.type", notEmpty = true)
private String eventMeshMetaStoragePluginType = "nacos";

@ConfigFiled(field = "metaStorage.plugin.username")
@ConfigField(field = "metaStorage.plugin.username")
private String eventMeshMetaStoragePluginUsername = "";

@ConfigFiled(field = "metaStorage.plugin.password")
@ConfigField(field = "metaStorage.plugin.password")
private String eventMeshMetaStoragePluginPassword = "";

@ConfigFiled(field = "metaStorage.plugin.metaStorageIntervalInMills")
private Integer eventMeshMetaStorageIntervalInMills = 10 * 1000;

@ConfigFiled(field = "metaStorage.plugin.fetchMetaStorageAddrIntervalInMills")
private Integer eventMeshFetchMetaStorageAddrInterval = 10 * 1000;

@ConfigFiled(field = "metaStorage.plugin.enabled")
@ConfigField(field = "metaStorage.plugin.enabled")
private boolean eventMeshServerMetaStorageEnable = false;

@ConfigFiled(field = "trace.plugin", notEmpty = true)
@ConfigField(field = "trace.plugin", notEmpty = true)
private String eventMeshTracePluginType;

@ConfigFiled(field = "metrics.plugin", notEmpty = true)
@ConfigField(field = "metrics.plugin", notEmpty = true)
private List<String> eventMeshMetricsPluginType;

@ConfigFiled(field = "security.plugin.type", notEmpty = true)
@ConfigField(field = "security.plugin.type", notEmpty = true)
private String eventMeshSecurityPluginType = "security";

@ConfigFiled(field = "connector.plugin.type", notEmpty = true)
private String eventMeshConnectorPluginType = "rocketmq";

@ConfigFiled(field = "storage.plugin.type", notEmpty = true)
private String eventMeshStoragePluginType = "rocketmq";
@ConfigField(field = "storage.plugin.type", notEmpty = true)
private String eventMeshStoragePluginType = "standalone";

@ConfigFiled(field = "security.validation.type.token", notEmpty = true)
@ConfigField(field = "security.validation.type.token", notEmpty = true)
private boolean eventMeshSecurityValidateTypeToken = false;

@ConfigFiled(field = "server.trace.enabled")
@ConfigField(field = "server.trace.enabled")
private boolean eventMeshServerTraceEnable = false;

@ConfigFiled(field = "server.security.enabled")
@ConfigField(field = "server.security.enabled")
private boolean eventMeshServerSecurityEnable = false;

@ConfigFiled(field = "security.publickey")
@ConfigField(field = "security.publickey")
private String eventMeshSecurityPublickey = "";

@ConfigFiled(field = "server.provide.protocols", reload = true)
@ConfigField(field = "server.provide.protocols", reload = true)
private List<String> eventMeshProvideServerProtocols;

@ConfigFiled(reload = true)
@ConfigField(reload = true)
private String eventMeshWebhookOrigin;

@ConfigFiled(reload = true)
@ConfigField(reload = true)
private String meshGroup;

@ConfigFiled(field = "server.retry.plugin.type")
@ConfigField(field = "server.retry.plugin.type")
private String eventMeshRetryPluginType = Constants.DEFAULT;

@ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true)
@ConfigField(field = "registry.plugin.server-addr", notEmpty = true)
private String registryAddr = "";

@ConfigFiled(field = "registry.plugin.type", notEmpty = true)
@ConfigField(field = "registry.plugin.type", notEmpty = true)
private String eventMeshRegistryPluginType = "nacos";

@ConfigFiled(field = "registry.plugin.username")
@ConfigField(field = "registry.plugin.username")
private String eventMeshRegistryPluginUsername = "";

@ConfigFiled(field = "registry.plugin.password")
@ConfigField(field = "registry.plugin.password")
private String eventMeshRegistryPluginPassword = "";

public void reload() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.FIELD})
public @interface ConfigFiled {
public @interface ConfigField {

/**
* @return The key name of the configuration file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.config.convert;

import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;

import org.apache.commons.lang3.StringUtils;

Expand All @@ -44,7 +44,7 @@ default boolean canHandleNullValue() {
/**
* @return The value converter needs
*/
default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigFiled configFiled) {
default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigField configField) {
Properties properties = convertInfo.getProperties();
String value = properties.getProperty(key);

Expand All @@ -54,14 +54,14 @@ default Object processFieldValue(ConvertInfo convertInfo, String key, ConfigFile

value = value.trim();

boolean findEnv = configFiled.findEnv();
String fieldName = configFiled.field();
boolean findEnv = configField.findEnv();
String fieldName = configField.field();

if (StringUtils.isBlank(value) && !StringUtils.isBlank(fieldName) && findEnv) {
value = Optional.ofNullable(System.getProperty(fieldName)).orElse(System.getenv(fieldName));
}

if (StringUtils.isBlank(value) && configFiled.notEmpty()) {
if (StringUtils.isBlank(value) && configField.notEmpty()) {
throw new RuntimeException(key + " can't be empty!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.config.convert;

import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;
import org.apache.eventmesh.common.config.convert.converter.BaseDataTypeConverter;
import org.apache.eventmesh.common.config.convert.converter.DateConverter;
import org.apache.eventmesh.common.config.convert.converter.EnumConverter;
Expand Down Expand Up @@ -96,9 +96,9 @@ public static void register(ConvertValue<?> convertValue, Class<?>... clazzs) {
*/
public static ConvertValue<?> getFieldConverter(Field field) {
Class<?> clazz = field.getType();
ConfigFiled configFiled = field.getAnnotation(ConfigFiled.class);
ConfigField configField = field.getAnnotation(ConfigField.class);

Class<?> converter1 = configFiled.converter();
Class<?> converter1 = configField.converter();
if (!converter1.equals(ConvertValue.DefaultConverter.class)) {
if (!classToConverter.containsKey(converter1)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.eventmesh.common.config.convert.converter;

import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;
import org.apache.eventmesh.common.config.ConfigInfo;
import org.apache.eventmesh.common.config.convert.ConvertInfo;
import org.apache.eventmesh.common.config.convert.ConvertValue;
Expand Down Expand Up @@ -110,26 +110,26 @@ private void setValue() throws Exception {
field.setAccessible(true);

ConvertInfo convertInfo = this.convertInfo;
ConfigFiled configFiled = field.getAnnotation(ConfigFiled.class);
if (Objects.isNull(configFiled)) {
ConfigField configField = field.getAnnotation(ConfigField.class);
if (Objects.isNull(configField)) {
continue;
}

String key = this.buildKey(configFiled);
needReload = this.checkNeedReload(needReload, configFiled);
String key = this.buildKey(configField);
needReload = this.checkNeedReload(needReload, configField);

ConvertValue<?> convertValue = ConverterMap.getFieldConverter(field);
Object fieldValue = convertValue.processFieldValue(convertInfo, key, configFiled);
Object fieldValue = convertValue.processFieldValue(convertInfo, key, configField);

if (!checkFieldValueBefore(configFiled, key, convertValue, fieldValue)) {
if (!checkFieldValueBefore(configField, key, convertValue, fieldValue)) {
continue;
}
convertInfo.setValue(fieldValue);
convertInfo.setField(field);
convertInfo.setKey(key);
Object convertedValue = convertValue.convert(convertInfo);

if (!checkFieldValueAfter(configFiled, key, convertedValue)) {
if (!checkFieldValueAfter(configField, key, convertedValue)) {
continue;
}
field.set(object, convertedValue);
Expand All @@ -155,16 +155,16 @@ private void reloadConfigIfNeed(boolean needReload) throws NoSuchMethodException
}
}

private boolean checkFieldValueAfter(ConfigFiled configFiled, String key, Object convertedValue) {
private boolean checkFieldValueAfter(ConfigField configField, String key, Object convertedValue) {
if (Objects.isNull(convertedValue)) {
if (configFiled.notNull()) {
if (configField.notNull()) {
throw new RuntimeException(key + " can not be null!");
}

return false;
}

if (configFiled.beNumber()) {
if (configField.beNumber()) {
if (!StringUtils.isNumeric(String.valueOf(convertedValue))) {
throw new RuntimeException(key + " must be number!");
}
Expand All @@ -173,9 +173,9 @@ private boolean checkFieldValueAfter(ConfigFiled configFiled, String key, Object
return true;
}

private boolean checkFieldValueBefore(ConfigFiled configFiled, String key, ConvertValue<?> convertValue, Object fieldValue) {
private boolean checkFieldValueBefore(ConfigField configField, String key, ConvertValue<?> convertValue, Object fieldValue) {
if (Objects.isNull(fieldValue) && !convertValue.canHandleNullValue()) {
if (configFiled.notNull()) {
if (configField.notNull()) {
throw new RuntimeException(key + " can not be null!");
}

Expand All @@ -185,8 +185,8 @@ private boolean checkFieldValueBefore(ConfigFiled configFiled, String key, Conve
return true;
}

private boolean checkNeedReload(boolean needReload, ConfigFiled configFiled) {
if (!needReload && configFiled != null && configFiled.reload()) {
private boolean checkNeedReload(boolean needReload, ConfigField configField) {
if (!needReload && configField != null && configField.reload()) {
needReload = Boolean.TRUE;
}

Expand All @@ -201,14 +201,14 @@ private boolean checkNeedReload(boolean needReload, ConfigFiled configFiled) {
return needReload;
}

private String buildKey(ConfigFiled configFiled) {
private String buildKey(ConfigField configField) {
String key;
StringBuilder keyPrefix = new StringBuilder(Objects.isNull(prefix) ? "" : prefix);

if (configFiled == null || configFiled.field().isEmpty() && keyPrefix.length() > 0) {
if (configField == null || configField.field().isEmpty() && keyPrefix.length() > 0) {
key = keyPrefix.deleteCharAt(keyPrefix.length() - 1).toString();
} else {
key = keyPrefix.append(configFiled.field()).toString();
key = keyPrefix.append(configField.field()).toString();
}

return key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.common.config.convert.converter;

import org.apache.eventmesh.common.config.ConfigFiled;
import org.apache.eventmesh.common.config.ConfigField;
import org.apache.eventmesh.common.config.convert.ConvertInfo;
import org.apache.eventmesh.common.config.convert.ConvertValue;
import org.apache.eventmesh.common.utils.PropertiesUtils;
Expand All @@ -41,7 +41,7 @@ public Properties convert(ConvertInfo convertInfo) {
}

@Override
public Object processFieldValue(ConvertInfo convertInfo, String prefix, ConfigFiled configFiled) {
public Object processFieldValue(ConvertInfo convertInfo, String prefix, ConfigField configField) {
Properties properties = convertInfo.getProperties();

if (StringUtils.isBlank(prefix)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public void testGetCommonConfiguration() {
Assertions.assertEquals("cluster-succeed!!!", config.getEventMeshCluster());
Assertions.assertEquals("name-succeed!!!", config.getEventMeshName());
Assertions.assertEquals("816", config.getSysID());
// Assertions.assertEquals("connector-succeed!!!", config.getEventMeshConnectorPluginType());
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
Assertions.assertEquals("storage-succeed!!!", config.getEventMeshStoragePluginType());
Assertions.assertEquals("security-succeed!!!", config.getEventMeshSecurityPluginType());
Expand All @@ -55,9 +54,6 @@ public void testGetCommonConfiguration() {
Assertions.assertEquals("username-succeed!!!", config.getEventMeshMetaStoragePluginUsername());
Assertions.assertEquals("password-succeed!!!", config.getEventMeshMetaStoragePluginPassword());

Assertions.assertEquals(Integer.valueOf(816), config.getEventMeshMetaStorageIntervalInMills());
Assertions.assertEquals(Integer.valueOf(1816), config.getEventMeshFetchMetaStorageAddrInterval());

List<String> list = new ArrayList<>();
list.add("metrics-succeed1!!!");
list.add("metrics-succeed2!!!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public class SystemUtilsTest {

@Test
public void isLinuxPlatform() {
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
Assertions.assertTrue(SystemUtils.isLinuxPlatform());
Assertions.assertFalse(SystemUtils.isWindowsPlatform());
}
}

@Test
public void isWindowsPlatform() {
if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
if (SystemUtils.OS_NAME != null && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
Assertions.assertFalse(SystemUtils.isLinuxPlatform());
Assertions.assertTrue(SystemUtils.isWindowsPlatform());
}
Expand Down
Loading

0 comments on commit 859ad8d

Please sign in to comment.