From efadcd6aea07ea5e0c7751770dc2e7a41558fcf2 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sun, 22 Oct 2023 21:50:04 +0800 Subject: [PATCH] [ZEPPELIN-5972] Support Flink 1.17 (#4677) * Support Flink 1.17 * Bump Flink 1.17.1 * Fix package name * fix java 11 compile * empty --------- Co-authored-by: Jeff Zhang --- .github/workflows/core.yml | 2 +- flink/flink-scala-parent/pom.xml | 42 ++ .../flink/FlinkSqlInterpreterTest.java | 7 +- .../org/apache/zeppelin/flink/FlinkShims.java | 3 + .../apache/zeppelin/flink/Flink116Shims.java | 2 - flink/flink1.17-shims/pom.xml | 207 ++++++ .../apache/zeppelin/flink/Flink117Shims.java | 397 ++++++++++++ .../flink/Flink117SqlInterpreter.java | 590 ++++++++++++++++++ .../org/apache/zeppelin/flink/PrintUtils.java | 318 ++++++++++ .../zeppelin/flink/TimestampStringUtils.java | 143 +++++ .../shims117/CollectStreamTableSink.java | 97 +++ flink/pom.xml | 10 + testing/env_python_3_with_flink_117.yml | 29 + 13 files changed, 1842 insertions(+), 5 deletions(-) create mode 100644 flink/flink1.17-shims/pom.xml create mode 100644 flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java create mode 100644 flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java create mode 100644 flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java create mode 100644 flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java create mode 100644 flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/shims117/CollectStreamTableSink.java create mode 100644 testing/env_python_3_with_flink_117.yml diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 5c8a8d77200..cc639869a91 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -243,7 +243,7 @@ jobs: strategy: fail-fast: false matrix: - flink: [113, 114, 115, 116] + flink: [113, 114, 115, 116, 117] steps: - name: Checkout uses: actions/checkout@v3 diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml index 135fdd8617d..79b839fefb4 100644 --- a/flink/flink-scala-parent/pom.xml +++ b/flink/flink-scala-parent/pom.xml @@ -79,6 +79,12 @@ ${project.version} + + org.apache.zeppelin + flink1.17-shims + ${project.version} + + org.apache.zeppelin zeppelin-python @@ -1007,6 +1013,42 @@ + + flink-117 + + ${flink1.17.version} + 2.12.7 + 2.12 + + + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_${flink.scala.binary.version} + ${flink.version} + provided + + + org.apache.flink + flink-sql-client + ${flink.version} + provided + + + org.apache.flink + flink-python + ${flink.version} + provided + + + + hive2 diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java index 2673ff0f900..e51042dc14b 100644 --- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java +++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java @@ -511,6 +511,11 @@ public void testCatalog() throws IOException, InterpreterException{ resultMessages = context.out.toInterpreterResultMessage(); assertEquals("current catalog: test_catalog\n", resultMessages.get(0).getData()); + // USE DEFAULT_CATALOG + context = getInterpreterContext(); + result = sqlInterpreter.interpret("USE CATALOG default_catalog", context); + assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); + // DROP CATALOG context = getInterpreterContext(); result = sqlInterpreter.interpret( @@ -526,8 +531,6 @@ public void testCatalog() throws IOException, InterpreterException{ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code()); resultMessages = context.out.toInterpreterResultMessage(); assertTrue(context.out.toString(), resultMessages.get(0).getData().contains("default_catalog")); - assertFalse(context.out.toString(), resultMessages.get(0).getData().contains("test_catalog")); - } @Test diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java index 916c3313dcf..11de5bd3b7e 100644 --- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java +++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java @@ -66,6 +66,9 @@ private static FlinkShims loadShims(FlinkVersion flinkVersion, } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 16) { LOGGER.info("Initializing shims for Flink 1.16"); flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink116Shims"); + } else if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 17) { + LOGGER.info("Initializing shims for Flink 1.17"); + flinkShimsClass = Class.forName("org.apache.zeppelin.flink.Flink117Shims"); } else { throw new Exception("Flink version: '" + flinkVersion + "' is not supported yet"); } diff --git a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java index b96e5f5e422..3578ffc8bb0 100644 --- a/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java +++ b/flink/flink1.16-shims/src/main/java/org/apache/zeppelin/flink/Flink116Shims.java @@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.cli.CliFrontend; @@ -53,7 +52,6 @@ import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; diff --git a/flink/flink1.17-shims/pom.xml b/flink/flink1.17-shims/pom.xml new file mode 100644 index 00000000000..997020abfc2 --- /dev/null +++ b/flink/flink1.17-shims/pom.xml @@ -0,0 +1,207 @@ + + + + + + flink-parent + org.apache.zeppelin + 0.11.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + flink1.17-shims + 0.11.0-SNAPSHOT + jar + Zeppelin: Flink1.17 Shims + + + ${flink1.17.version} + 2.12 + + + + + + org.apache.zeppelin + flink-shims + ${project.version} + + + + org.apache.flink + flink-core + ${flink.version} + provided + + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala-bridge_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + org.apache.flink + flink-scala_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + + org.apache.flink + flink-table-planner_${flink.scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-python + ${flink.version} + provided + + + + org.apache.flink + flink-sql-client + ${flink.version} + provided + + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + ${flink.scala.version} + + -unchecked + -deprecation + -feature + -nobootcp + + + -Xms1024m + -Xmx1024m + -XX:MaxMetaspaceSize=${MaxMetaspace} + + + -source + ${java.version} + -target + ${java.version} + -Xlint:all,-serial,-path,-options + + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java new file mode 100644 index 00000000000..9bc22cc57df --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117Shims.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.compress.utils.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.cli.CustomCommandLine; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; +import org.apache.flink.table.api.*; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.client.resource.ClientResourceManager; +import org.apache.flink.table.client.util.ClientClassloaderUtil; +import org.apache.flink.table.client.util.ClientWrapperClassLoader; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.PlannerFactoryUtil; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableAggregateFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.resource.ResourceManager; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.FlinkException; +import org.apache.zeppelin.flink.shims117.CollectStreamTableSink; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.URL; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + + +/** + * Shims for flink 1.17 + */ +public class Flink117Shims extends FlinkShims { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink117Shims.class); + + private Flink117SqlInterpreter batchSqlInterpreter; + private Flink117SqlInterpreter streamSqlInterpreter; + + public Flink117Shims(FlinkVersion flinkVersion, Properties properties) { + super(flinkVersion, properties); + } + + public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) { + this.batchSqlInterpreter = new Flink117SqlInterpreter(flinkSqlContext, true); + } + + public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) { + this.streamSqlInterpreter = new Flink117SqlInterpreter(flinkSqlContext, false); + } + + @Override + public Object createResourceManager(List jars, Object tableConfig) { + Configuration configuration = ((TableConfig) tableConfig).getConfiguration().clone(); + ClientWrapperClassLoader userClassLoader = + new ClientWrapperClassLoader( + ClientClassloaderUtil.buildUserClassLoader( + jars, + Thread.currentThread().getContextClassLoader(), + new Configuration(configuration)), + configuration); + return new ClientResourceManager(configuration, userClassLoader); + } + + @Override + public Object createFunctionCatalog(Object tableConfig, Object catalogManager, Object moduleManager, List jars) { + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, (TableConfig) tableConfig); + return new FunctionCatalog((TableConfig) tableConfig, resourceManager, (CatalogManager) catalogManager, (ModuleManager) moduleManager); + } + + @Override + public void disableSysoutLogging(Object batchConfig, Object streamConfig) { + // do nothing + } + + @Override + public Object createScalaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig); + + return new org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl(catalogManager, + moduleManager, resourceManager, + functionCatalog, tableConfig, new org.apache.flink.streaming.api.scala.StreamExecutionEnvironment(senv), + planner, executor, environmentSettings.isStreamingMode()); + } + + @Override + public Object createJavaBlinkStreamTableEnvironment(Object environmentSettingsObj, + Object senvObj, + Object tableConfigObj, + Object moduleManagerObj, + Object functionCatalogObj, + Object catalogManagerObj, + List jars, + ClassLoader classLoader) { + EnvironmentSettings environmentSettings = (EnvironmentSettings) environmentSettingsObj; + StreamExecutionEnvironment senv = (StreamExecutionEnvironment) senvObj; + TableConfig tableConfig = (TableConfig) tableConfigObj; + ModuleManager moduleManager = (ModuleManager) moduleManagerObj; + FunctionCatalog functionCatalog = (FunctionCatalog) functionCatalogObj; + CatalogManager catalogManager = (CatalogManager) catalogManagerObj; + ImmutablePair pair = createPlannerAndExecutor( + classLoader, environmentSettings, senv, + tableConfig, moduleManager, functionCatalog, catalogManager); + Planner planner = (Planner) pair.left; + Executor executor = (Executor) pair.right; + + ResourceManager resourceManager = (ResourceManager) createResourceManager(jars, tableConfig); + + return new StreamTableEnvironmentImpl(catalogManager, moduleManager, resourceManager, + functionCatalog, tableConfig, senv, planner, executor, environmentSettings.isStreamingMode()); + } + + @Override + public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) { + return new StreamExecutionEnvironmentFactory() { + @Override + public StreamExecutionEnvironment createExecutionEnvironment(Configuration configuration) { + return (StreamExecutionEnvironment) streamExecutionEnvironment; + } + }; + } + + @Override + public Object createCatalogManager(Object config) { + return CatalogManager.newBuilder() + .classLoader(Thread.currentThread().getContextClassLoader()) + .config((ReadableConfig) config) + .defaultCatalog("default_catalog", + new GenericInMemoryCatalog("default_catalog", "default_database")) + .build(); + } + + @Override + public String getPyFlinkPythonPath(Properties properties) throws IOException { + String mode = properties.getProperty("flink.execution.mode"); + if ("yarn-application".equalsIgnoreCase(mode)) { + // for yarn application mode, FLINK_HOME is container working directory + String flinkHome = new File(".").getAbsolutePath(); + return getPyFlinkPythonPath(new File(flinkHome + "/lib/python")); + } + + String flinkHome = System.getenv("FLINK_HOME"); + if (StringUtils.isNotBlank(flinkHome)) { + return getPyFlinkPythonPath(new File(flinkHome + "/opt/python")); + } else { + throw new IOException("No FLINK_HOME is specified"); + } + } + + private String getPyFlinkPythonPath(File pyFlinkFolder) throws IOException { + LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder); + if (!pyFlinkFolder.exists() || !pyFlinkFolder.isDirectory()) { + throw new IOException(String.format("PyFlink folder %s does not exist or is not a folder", + pyFlinkFolder.getAbsolutePath())); + } + List depFiles = Arrays.asList(pyFlinkFolder.listFiles()); + StringBuilder builder = new StringBuilder(); + for (File file : depFiles) { + LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath()); + builder.append(file.getAbsolutePath() + ":"); + } + return builder.toString(); + } + + @Override + public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) { + return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer>) serializer); + } + + @Override + public List collectToList(Object table) throws Exception { + return Lists.newArrayList(((Table) table).execute().collect()); + } + + @Override + public boolean rowEquals(Object row1, Object row2) { + Row r1 = (Row) row1; + Row r2 = (Row) row2; + r1.setKind(RowKind.INSERT); + r2.setKind(RowKind.INSERT); + return r1.equals(r2); + } + + @Override + public Object fromDataSet(Object btenv, Object ds) { + throw new RuntimeException("Conversion from DataSet is not supported in Flink 1.17"); + } + + @Override + public Object toDataSet(Object btenv, Object table) { + throw new RuntimeException("Conversion to DataSet is not supported in Flink 1.17"); + } + + @Override + public void registerTableSink(Object stenv, String tableName, Object collectTableSink) { + ((org.apache.flink.table.api.internal.TableEnvironmentInternal) stenv) + .registerTableSinkInternal(tableName, (TableSink) collectTableSink); + } + + @Override + public void registerScalarFunction(Object btenv, String name, Object scalarFunction) { + ((StreamTableEnvironmentImpl) (btenv)).createTemporarySystemFunction(name, (ScalarFunction) scalarFunction); + } + + @Override + public void registerTableFunction(Object btenv, String name, Object tableFunction) { + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableFunction) tableFunction); + } + + @Override + public void registerAggregateFunction(Object btenv, String name, Object aggregateFunction) { + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (AggregateFunction) aggregateFunction); + } + + @Override + public void registerTableAggregateFunction(Object btenv, String name, Object tableAggregateFunction) { + ((StreamTableEnvironmentImpl) (btenv)).registerFunction(name, (TableAggregateFunction) tableAggregateFunction); + } + + /** + * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager. + * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code. + * + * @param catalogManager + * @param parserObject + * @param environmentSetting + */ + @Override + public void setCatalogManagerSchemaResolver(Object catalogManager, + Object parserObject, + Object environmentSetting) { + + } + + @Override + public Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object effectiveConfig) { + CustomCommandLine customCommandLine = ((CliFrontend) cliFrontend).validateAndGetActiveCommandLine((CommandLine) commandLine); + try { + ((Configuration) effectiveConfig).addAll(customCommandLine.toConfiguration((CommandLine) commandLine)); + return effectiveConfig; + } catch (FlinkException e) { + throw new RuntimeException("Fail to call addAll", e); + } + } + + @Override + public void setBatchRuntimeMode(Object tableConfig) { + ((TableConfig) tableConfig).getConfiguration() + .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); + } + + @Override + public void setOldPlanner(Object tableConfig) { + + } + + @Override + public String[] rowToString(Object row, Object table, Object tableConfig) { + final String zone = ((TableConfig) tableConfig).getConfiguration() + .get(TableConfigOptions.LOCAL_TIME_ZONE); + ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) + ? ZoneId.systemDefault() + : ZoneId.of(zone); + + ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema(); + return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId); + } + + @Override + public boolean isTimeIndicatorType(Object type) { + if (type instanceof TimeIndicatorTypeInfo) { + return true; + } else { + return false; + } + } + + private Object lookupExecutor(ClassLoader classLoader, + Object settings, + Object sEnv) { + try { + final ExecutorFactory executorFactory = + FactoryUtil.discoverFactory( + classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); + final Method createMethod = + executorFactory + .getClass() + .getMethod("create", StreamExecutionEnvironment.class); + + return createMethod.invoke(executorFactory, sEnv); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + } + + @Override + public ImmutablePair createPlannerAndExecutor( + ClassLoader classLoader, Object environmentSettings, Object sEnv, + Object tableConfig, Object moduleManager, Object functionCatalog, Object catalogManager) { + EnvironmentSettings settings = (EnvironmentSettings) environmentSettings; + Executor executor = (Executor) lookupExecutor(classLoader, environmentSettings, sEnv); + Planner planner = PlannerFactoryUtil.createPlanner(executor, + (TableConfig) tableConfig, + Thread.currentThread().getContextClassLoader(), + (ModuleManager) moduleManager, + (CatalogManager) catalogManager, + (FunctionCatalog) functionCatalog); + return ImmutablePair.of(planner, executor); + } + + @Override + public Object createBlinkPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance(); + } + + @Override + public Object createOldPlannerEnvSettingBuilder() { + return EnvironmentSettings.newInstance(); + } + + public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) { + if (isBatch) { + return batchSqlInterpreter.runSqlList(st, context); + } else { + return streamSqlInterpreter.runSqlList(st, context); + } + } +} diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java new file mode 100644 index 00000000000..b53d02c8e62 --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/Flink117SqlInterpreter.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.SqlParserException; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.*; +import org.apache.flink.table.operations.command.HelpOperation; +import org.apache.flink.table.operations.command.SetOperation; +import org.apache.flink.table.operations.ddl.*; +import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.ZeppelinContext; +import org.apache.zeppelin.interpreter.util.SqlSplitter; +import org.jline.utils.AttributedString; +import org.jline.utils.AttributedStringBuilder; +import org.jline.utils.AttributedStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + + +public class Flink117SqlInterpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(Flink117SqlInterpreter.class); + private static final String CMD_DESC_DELIMITER = "\t\t"; + + /** + * SQL Client HELP command helper class. + */ + private static final class SQLCliCommandsDescriptions { + private int commandMaxLength; + private final Map commandsDescriptions; + + public SQLCliCommandsDescriptions() { + this.commandsDescriptions = new LinkedHashMap<>(); + this.commandMaxLength = -1; + } + + public SQLCliCommandsDescriptions commandDescription(String command, String description) { + Preconditions.checkState( + StringUtils.isNotBlank(command), "content of command must not be empty."); + Preconditions.checkState( + StringUtils.isNotBlank(description), + "content of command's description must not be empty."); + this.updateMaxCommandLength(command.length()); + this.commandsDescriptions.put(command, description); + return this; + } + + private void updateMaxCommandLength(int newLength) { + Preconditions.checkState(newLength > 0); + if (this.commandMaxLength < newLength) { + this.commandMaxLength = newLength; + } + } + + public AttributedString build() { + AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder(); + if (!this.commandsDescriptions.isEmpty()) { + this.commandsDescriptions.forEach( + (cmd, cmdDesc) -> { + attributedStringBuilder + .style(AttributedStyle.DEFAULT.bold()) + .append( + String.format( + String.format("%%-%ds", commandMaxLength), cmd)) + .append(CMD_DESC_DELIMITER) + .style(AttributedStyle.DEFAULT) + .append(cmdDesc) + .append('\n'); + }); + } + return attributedStringBuilder.toAttributedString(); + } + } + + private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS = + new SQLCliCommandsDescriptions() + .commandDescription("HELP", "Prints the available commands.") + .commandDescription( + "SET", + "Sets a session configuration property. Syntax: \"SET ''='';\". Use \"SET;\" for listing all properties.") + .commandDescription( + "RESET", + "Resets a session configuration property. Syntax: \"RESET '';\". Use \"RESET;\" for reset all session properties.") + .commandDescription( + "INSERT INTO", + "Inserts the results of a SQL SELECT query into a declared table sink.") + .commandDescription( + "INSERT OVERWRITE", + "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.") + .commandDescription( + "SELECT", "Executes a SQL SELECT query on the Flink cluster.") + .commandDescription( + "EXPLAIN", + "Describes the execution plan of a query or table with the given name.") + .commandDescription( + "BEGIN STATEMENT SET", + "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"") + .commandDescription("END", "Ends a statement set. Syntax: \"END;\"") + // (TODO) zjffdu, ADD/REMOVE/SHOW JAR + .build(); + + // -------------------------------------------------------------------------------------------- + + public static final AttributedString MESSAGE_HELP = + new AttributedStringBuilder() + .append("The following commands are available:\n\n") + .append(SQL_CLI_COMMANDS_DESCRIPTIONS) + .style(AttributedStyle.DEFAULT.underline()) + .append("\nHint") + .style(AttributedStyle.DEFAULT) + .append( + ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.") + // About Documentation Link. + .style(AttributedStyle.DEFAULT) + .append( + "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.") + .toAttributedString(); + + private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit."; + + private FlinkSqlContext flinkSqlContext; + private TableEnvironment tbenv; + private ZeppelinContext z; + private Parser sqlParser; + private SqlSplitter sqlSplitter; + // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax: + // 1. runAsOne= true + // 2. begin statement set; + // ... + // end; + private Map> statementOperationsMap = new HashMap<>(); + private boolean isBatch; + private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock(); + + + public Flink117SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) { + this.flinkSqlContext = flinkSqlContext; + this.isBatch = isBatch; + if (isBatch) { + this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv(); + } else { + this.tbenv = (TableEnvironment) flinkSqlContext.getStenv(); + } + this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext(); + this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser(); + this.sqlSplitter = new SqlSplitter(); + JobListener jobListener = new JobListener() { + @Override + public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + LOGGER.info("UnLock JobSubmitLock"); + } + } + + @Override + public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { + + } + }; + + ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener); + ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener); + } + + public InterpreterResult runSqlList(String st, InterpreterContext context) { + try { + boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false")); + if (runAsOne) { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + String jobName = context.getLocalProperties().get("jobName"); + if (StringUtils.isNotBlank(jobName)) { + tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName); + } + + List sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList()); + for (String sql : sqls) { + List operations = null; + try { + operations = sqlParser.parse(sql); + } catch (SqlParserException e) { + context.out.write("%text Invalid Sql statement: " + sql + "\n"); + context.out.write(MESSAGE_HELP.toString()); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString()); + } + + try { + callOperation(sql, operations.get(0), context); + context.out.flush(); + } catch (Throwable e) { + LOGGER.error("Fail to run sql:" + sql, e); + try { + context.out.write("%text Fail to run sql command: " + + sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n"); + } catch (IOException ex) { + LOGGER.warn("Unexpected exception:", ex); + return new InterpreterResult(InterpreterResult.Code.ERROR, + ExceptionUtils.getStackTrace(e)); + } + return new InterpreterResult(InterpreterResult.Code.ERROR); + } + } + + if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) { + try { + lock.lock(); + List modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()); + if (!modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql as one job", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + } catch (Exception e) { + LOGGER.error("Fail to execute sql", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e)); + } finally { + statementOperationsMap.remove(context.getParagraphId()); + } + + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } + + private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException { + if (operation instanceof HelpOperation) { + // HELP + callHelp(context); + } else if (operation instanceof SetOperation) { + // SET + callSet((SetOperation) operation, context); + } else if (operation instanceof ModifyOperation) { + // INSERT INTO/OVERWRITE + callInsert((ModifyOperation) operation, context); + } else if (operation instanceof QueryOperation) { + // SELECT + callSelect(sql, (QueryOperation) operation, context); + } else if (operation instanceof ExplainOperation) { + // EXPLAIN + callExplain((ExplainOperation) operation, context); + } else if (operation instanceof BeginStatementSetOperation) { + // BEGIN STATEMENT SET + callBeginStatementSet(context); + } else if (operation instanceof EndStatementSetOperation) { + // END + callEndStatementSet(context); + } else if (operation instanceof ShowCreateTableOperation) { + // SHOW CREATE TABLE + callShowCreateTable((ShowCreateTableOperation) operation, context); + } else if (operation instanceof ShowCatalogsOperation) { + callShowCatalogs(context); + } else if (operation instanceof ShowCurrentCatalogOperation) { + callShowCurrentCatalog(context); + } else if (operation instanceof UseCatalogOperation) { + callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context); + } else if (operation instanceof CreateCatalogOperation) { + callDDL(sql, context, "Catalog has been created."); + } else if (operation instanceof DropCatalogOperation) { + callDDL(sql, context, "Catalog has been dropped."); + } else if (operation instanceof UseDatabaseOperation) { + UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation; + callUseDatabase(useDBOperation.getDatabaseName(), context); + } else if (operation instanceof CreateDatabaseOperation) { + callDDL(sql, context, "Database has been created."); + } else if (operation instanceof DropDatabaseOperation) { + callDDL(sql, context, "Database has been removed."); + } else if (operation instanceof AlterDatabaseOperation) { + callDDL(sql, context, "Alter database succeeded!"); + } else if (operation instanceof ShowDatabasesOperation) { + callShowDatabases(context); + } else if (operation instanceof ShowCurrentDatabaseOperation) { + callShowCurrentDatabase(context); + } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) { + callDDL(sql, context, "Table has been created."); + } else if (operation instanceof AlterTableOperation) { + callDDL(sql, context, "Alter table succeeded!"); + } else if (operation instanceof DropTableOperation) { + callDDL(sql, context, "Table has been dropped."); + } else if (operation instanceof DescribeTableOperation) { + DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; + callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context); + } else if (operation instanceof ShowTablesOperation) { + callShowTables(context); + } else if (operation instanceof CreateViewOperation) { + callDDL(sql, context, "View has been created."); + } else if (operation instanceof DropViewOperation) { + callDDL(sql, context, "View has been dropped."); + } else if (operation instanceof AlterViewOperation) { + callDDL(sql, context, "Alter view succeeded!"); + } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been created."); + } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) { + callDDL(sql, context, "Function has been removed."); + } else if (operation instanceof AlterCatalogFunctionOperation) { + callDDL(sql, context, "Alter function succeeded!"); + } else if (operation instanceof ShowFunctionsOperation) { + callShowFunctions(context); + } else if (operation instanceof ShowModulesOperation) { + callShowModules(context); + } else if (operation instanceof ShowPartitionsOperation) { + ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation; + callShowPartitions(showPartitionsOperation.asSummaryString(), context); + } else { + throw new IOException(operation.getClass().getName() + " is not supported"); + } + } + + + private void callHelp(InterpreterContext context) throws IOException { + context.out.write(MESSAGE_HELP.toString() + "\n"); + } + + private void callInsert(ModifyOperation operation, InterpreterContext context) throws IOException { + if (statementOperationsMap.containsKey(context.getParagraphId())) { + List modifyOperations = statementOperationsMap.get(context.getParagraphId()); + modifyOperations.add(operation); + } else { + callInserts(Collections.singletonList(operation), context); + } + } + + private void callInserts(List operations, InterpreterContext context) throws IOException { + if (!isBatch) { + context.getLocalProperties().put("flink.streaming.insert_into", "true"); + } + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); + checkState(tableResult.getJobClient().isPresent()); + try { + tableResult.await(); + JobClient jobClient = tableResult.getJobClient().get(); + if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { + context.out.write("Insertion successfully.\n"); + } else { + throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); + } + } catch (InterruptedException e) { + throw new IOException("Flink job is interrupted", e); + } catch (ExecutionException e) { + throw new IOException("Flink job is failed", e); + } + } + + private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation); + String explanation = + Objects.requireNonNull(tableResult.collect().next().getField(0)).toString(); + context.out.write(explanation + "\n"); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException { + try { + lock.lock(); + if (isBatch) { + callBatchInnerSelect(sql, context); + } else { + callStreamInnerSelect(sql, context); + } + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException { + Table table = this.tbenv.sqlQuery(sql); + String result = z.showData(table); + context.out.write(result); + } + + public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException { + flinkSqlContext.getStreamSqlSelectConsumer().accept(sql); + } + + public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException { + if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) { + // set a property + String key = setOperation.getKey().get().trim(); + String value = setOperation.getValue().get().trim(); + this.tbenv.getConfig().getConfiguration().setString(key, value); + LOGGER.info("Set table config: {}={}", key, value); + } else { + // show all properties + final Map properties = this.tbenv.getConfig().getConfiguration().toMap(); + List prettyEntries = new ArrayList<>(); + for (String key : properties.keySet()) { + prettyEntries.add( + String.format( + "'%s' = '%s'", + EncodingUtils.escapeSingleQuotes(key), + EncodingUtils.escapeSingleQuotes(properties.get(key)))); + } + prettyEntries.sort(String::compareTo); + prettyEntries.forEach(entry -> { + try { + context.out.write(entry + "\n"); + } catch (IOException e) { + LOGGER.warn("Fail to write output", e); + } + }); + } + } + + private void callBeginStatementSet(InterpreterContext context) throws IOException { + statementOperationsMap.put(context.getParagraphId(), new ArrayList<>()); + } + + private void callEndStatementSet(InterpreterContext context) throws IOException { + List modifyOperations = statementOperationsMap.get(context.getParagraphId()); + if (modifyOperations != null && !modifyOperations.isEmpty()) { + callInserts(modifyOperations, context); + } else { + context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET); + } + } + + private void callUseCatalog(String catalog, InterpreterContext context) throws IOException { + tbenv.executeSql("USE CATALOG `" + catalog + "`"); + } + + private void callUseDatabase(String databaseName, + InterpreterContext context) throws IOException { + this.tbenv.executeSql("USE `" + databaseName + "`"); + } + + private void callShowCatalogs(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs"); + List catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n"); + } + + private void callShowCurrentCatalog(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog"); + String catalog = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current catalog: " + catalog + "\n"); + } + + private void callShowDatabases(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Databases"); + List databases = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table database\n" + StringUtils.join(databases, "\n") + "\n"); + } + + private void callShowCurrentDatabase(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Current Database"); + String database = tableResult.collect().next().getField(0).toString(); + context.out.write("%text current database: " + database + "\n"); + } + + private void callShowTables(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Tables"); + List tables = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .filter(tbl -> !tbl.startsWith("UnnamedTable")) + .collect(Collectors.toList()); + context.out.write( + "%table table\n" + StringUtils.join(tables, "\n") + "\n"); + } + + private void callShowFunctions(InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql("SHOW Functions"); + List functions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table function\n" + StringUtils.join(functions, "\n") + "\n"); + } + + private void callShowModules(InterpreterContext context) throws IOException { + String[] modules = this.tbenv.listModules(); + context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n"); + } + + private void callShowPartitions(String sql, InterpreterContext context) throws IOException { + TableResult tableResult = this.tbenv.executeSql(sql); + List partions = CollectionUtil.iteratorToList(tableResult.collect()).stream() + .map(r -> checkNotNull(r.getField(0)).toString()) + .collect(Collectors.toList()); + context.out.write( + "%table partitions\n" + StringUtils.join(partions, "\n") + "\n"); + } + + private void callDDL(String sql, InterpreterContext context, String message) throws IOException { + try { + lock.lock(); + this.tbenv.executeSql(sql); + } finally { + if (lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + context.out.write(message + "\n"); + } + + private void callDescribe(String name, InterpreterContext context) throws IOException { + TableResult tableResult = null; + try { + tableResult = tbenv.executeSql("DESCRIBE " + name); + } catch (Exception e) { + throw new IOException("Fail to describe table: " + name, e); + } + CloseableIterator result = tableResult.collect(); + StringBuilder builder = new StringBuilder(); + builder.append("Column\tType\n"); + while (result.hasNext()) { + Row row = result.next(); + builder.append(row.getField(0) + "\t" + row.getField(1) + "\n"); + } + context.out.write("%table\n" + builder.toString()); + } +} diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java new file mode 100644 index 00000000000..a35ad3a6cd1 --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/PrintUtils.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.*; +import org.apache.flink.types.Row; +import org.apache.flink.util.StringUtils; + +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; +import static org.apache.zeppelin.flink.TimestampStringUtils.*; + +/** + * Copied from flink-project with minor modification. + * */ +public class PrintUtils { + + public static final String NULL_COLUMN = "(NULL)"; + private static final String COLUMN_TRUNCATED_FLAG = "..."; + + private PrintUtils() {} + + + public static String[] rowToString( + Row row, ResolvedSchema resolvedSchema, ZoneId sessionTimeZone) { + return rowToString(row, NULL_COLUMN, false, resolvedSchema, sessionTimeZone); + } + + public static String[] rowToString( + Row row, + String nullColumn, + boolean printRowKind, + ResolvedSchema resolvedSchema, + ZoneId sessionTimeZone) { + final int len = printRowKind ? row.getArity() + 1 : row.getArity(); + final List fields = new ArrayList<>(len); + if (printRowKind) { + fields.add(row.getKind().shortString()); + } + for (int i = 0; i < row.getArity(); i++) { + final Object field = row.getField(i); + final LogicalType fieldType = + resolvedSchema.getColumnDataTypes().get(i).getLogicalType(); + if (field == null) { + fields.add(nullColumn); + } else { + fields.add( + StringUtils.arrayAwareToString( + formattedTimestamp(field, fieldType, sessionTimeZone))); + } + } + return fields.toArray(new String[0]); + } + + /** + * Normalizes field that contains TIMESTAMP, TIMESTAMP_LTZ and TIME type data. + * + *

This method also supports nested type ARRAY, ROW, MAP. + */ + private static Object formattedTimestamp( + Object field, LogicalType fieldType, ZoneId sessionTimeZone) { + final LogicalTypeRoot typeRoot = fieldType.getTypeRoot(); + if (field == null) { + return "null"; + } + switch (typeRoot) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return formatTimestampField(field, fieldType, sessionTimeZone); + case TIME_WITHOUT_TIME_ZONE: + return formatTimeField(field); + case ARRAY: + LogicalType elementType = ((ArrayType) fieldType).getElementType(); + if (field instanceof List) { + List array = (List) field; + Object[] formattedArray = new Object[array.size()]; + for (int i = 0; i < array.size(); i++) { + formattedArray[i] = + formattedTimestamp(array.get(i), elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass().isArray()) { + // primitive type + if (field.getClass() == byte[].class) { + byte[] array = (byte[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == short[].class) { + short[] array = (short[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == int[].class) { + int[] array = (int[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == long[].class) { + long[] array = (long[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == float[].class) { + float[] array = (float[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == double[].class) { + double[] array = (double[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == boolean[].class) { + boolean[] array = (boolean[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else if (field.getClass() == char[].class) { + char[] array = (char[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } else { + // non-primitive type + Object[] array = (Object[]) field; + Object[] formattedArray = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + formattedArray[i] = + formattedTimestamp(array[i], elementType, sessionTimeZone); + } + return formattedArray; + } + } else { + return field; + } + case ROW: + if (fieldType instanceof RowType && field instanceof Row) { + Row row = (Row) field; + Row formattedRow = new Row(row.getKind(), row.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + formattedRow.setField( + i, formattedTimestamp(row.getField(i), type, sessionTimeZone)); + } + return formattedRow; + + } else if (fieldType instanceof RowType && field instanceof RowData) { + RowData rowData = (RowData) field; + Row formattedRow = new Row(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < ((RowType) fieldType).getFields().size(); i++) { + LogicalType type = ((RowType) fieldType).getFields().get(i).getType(); + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, i); + formattedRow.setField( + i, + formattedTimestamp( + fieldGetter.getFieldOrNull(rowData), + type, + sessionTimeZone)); + } + return formattedRow; + } else { + return field; + } + case MAP: + LogicalType keyType = ((MapType) fieldType).getKeyType(); + LogicalType valueType = ((MapType) fieldType).getValueType(); + if (fieldType instanceof MapType && field instanceof Map) { + Map map = ((Map) field); + Map formattedMap = new HashMap<>(map.size()); + for (Object key : map.keySet()) { + formattedMap.put( + formattedTimestamp(key, keyType, sessionTimeZone), + formattedTimestamp(map.get(key), valueType, sessionTimeZone)); + } + return formattedMap; + } else if (fieldType instanceof MapType && field instanceof MapData) { + MapData map = ((MapData) field); + Map formattedMap = new HashMap<>(map.size()); + Object[] keyArray = + (Object[]) formattedTimestamp(map.keyArray(), keyType, sessionTimeZone); + Object[] valueArray = + (Object[]) + formattedTimestamp( + map.valueArray(), valueType, sessionTimeZone); + for (int i = 0; i < keyArray.length; i++) { + formattedMap.put(keyArray[i], valueArray[i]); + } + return formattedMap; + } else { + return field; + } + default: + return field; + } + } + + /** + * Formats the print content of TIMESTAMP and TIMESTAMP_LTZ type data, consider the user + * configured time zone. + */ + private static Object formatTimestampField( + Object timestampField, LogicalType fieldType, ZoneId sessionTimeZone) { + switch (fieldType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int precision = getPrecision(fieldType); + if (timestampField instanceof java.sql.Timestamp) { + // conversion between java.sql.Timestamp and TIMESTAMP_WITHOUT_TIME_ZONE + return timestampToString( + ((Timestamp) timestampField).toLocalDateTime(), precision); + } else if (timestampField instanceof java.time.LocalDateTime) { + return timestampToString(((LocalDateTime) timestampField), precision); + } else if (timestampField instanceof TimestampData) { + return timestampToString( + ((TimestampData) timestampField).toLocalDateTime(), precision); + } else { + return timestampField; + } + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + Instant instant = null; + if (timestampField instanceof java.time.Instant) { + instant = ((Instant) timestampField); + } else if (timestampField instanceof java.sql.Timestamp) { + Timestamp timestamp = ((Timestamp) timestampField); + // conversion between java.sql.Timestamp and TIMESTAMP_WITH_LOCAL_TIME_ZONE + instant = + TimestampData.fromEpochMillis( + timestamp.getTime(), timestamp.getNanos() % 1000_000) + .toInstant(); + } else if (timestampField instanceof TimestampData) { + instant = ((TimestampData) timestampField).toInstant(); + } else if (timestampField instanceof Integer) { + instant = Instant.ofEpochSecond((Integer) timestampField); + } else if (timestampField instanceof Long) { + instant = Instant.ofEpochMilli((Long) timestampField); + } + if (instant != null) { + return timestampToString( + instant.atZone(sessionTimeZone).toLocalDateTime(), + getPrecision(fieldType)); + } else { + return timestampField; + } + default: + return timestampField; + } + } + + /** Formats the print content of TIME type data. */ + private static Object formatTimeField(Object timeField) { + if (timeField.getClass().isAssignableFrom(int.class) || timeField instanceof Integer) { + return unixTimeToString((int) timeField); + } else if (timeField.getClass().isAssignableFrom(long.class) || timeField instanceof Long) { + return unixTimeToString(((Long) timeField).intValue()); + } else if (timeField instanceof Time) { + return unixTimeToString(timeToInternal((Time) timeField)); + } else if (timeField instanceof LocalTime) { + return unixTimeToString(localTimeToUnixDate((LocalTime) timeField)); + } else { + return timeField; + } + } +} diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java new file mode 100644 index 00000000000..c52104e45af --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/TimestampStringUtils.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink; + +import java.sql.Time; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.TimeZone; + +/** + * Copied from flink-project with minor modification. + * */ +public class TimestampStringUtils { + + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + public TimestampStringUtils() { + } + + public static String timestampToString(LocalDateTime ldt, int precision) { + String fraction; + for(fraction = pad(9, (long)ldt.getNano()); fraction.length() > precision && fraction.endsWith("0"); fraction = fraction.substring(0, fraction.length() - 1)) { + } + + StringBuilder ymdhms = ymdhms(new StringBuilder(), ldt.getYear(), ldt.getMonthValue(), ldt.getDayOfMonth(), ldt.getHour(), ldt.getMinute(), ldt.getSecond()); + if (fraction.length() > 0) { + ymdhms.append(".").append(fraction); + } + + return ymdhms.toString(); + } + + private static String pad(int length, long v) { + StringBuilder s = new StringBuilder(Long.toString(v)); + + while(s.length() < length) { + s.insert(0, "0"); + } + + return s.toString(); + } + + private static StringBuilder hms(StringBuilder b, int h, int m, int s) { + int2(b, h); + b.append(':'); + int2(b, m); + b.append(':'); + int2(b, s); + return b; + } + + private static StringBuilder ymdhms(StringBuilder b, int year, int month, int day, int h, int m, int s) { + ymd(b, year, month, day); + b.append(' '); + hms(b, h, m, s); + return b; + } + + private static StringBuilder ymd(StringBuilder b, int year, int month, int day) { + int4(b, year); + b.append('-'); + int2(b, month); + b.append('-'); + int2(b, day); + return b; + } + + private static void int4(StringBuilder buf, int i) { + buf.append((char)(48 + i / 1000 % 10)); + buf.append((char)(48 + i / 100 % 10)); + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + private static void int2(StringBuilder buf, int i) { + buf.append((char)(48 + i / 10 % 10)); + buf.append((char)(48 + i % 10)); + } + + public static String unixTimeToString(int time) { + StringBuilder buf = new StringBuilder(8); + unixTimeToString(buf, time, 0); + return buf.toString(); + } + + private static void unixTimeToString(StringBuilder buf, int time, int precision) { + while(time < 0) { + time = (int)((long)time + 86400000L); + } + + int h = time / 3600000; + int time2 = time % 3600000; + int m = time2 / '\uea60'; + int time3 = time2 % '\uea60'; + int s = time3 / 1000; + int ms = time3 % 1000; + int2(buf, h); + buf.append(':'); + int2(buf, m); + buf.append(':'); + int2(buf, s); + if (precision > 0) { + buf.append('.'); + + while(precision > 0) { + buf.append((char)(48 + ms / 100)); + ms %= 100; + ms *= 10; + if (ms == 0) { + break; + } + + --precision; + } + } + + } + + public static int timeToInternal(Time time) { + long ts = time.getTime() + (long)LOCAL_TZ.getOffset(time.getTime()); + return (int)(ts % 86400000L); + } + + public static int localTimeToUnixDate(LocalTime time) { + return time.getHour() * 3600000 + time.getMinute() * '\uea60' + time.getSecond() * 1000 + time.getNano() / 1000000; + } +} diff --git a/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/shims117/CollectStreamTableSink.java b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/shims117/CollectStreamTableSink.java new file mode 100644 index 00000000000..ee58e770d44 --- /dev/null +++ b/flink/flink1.17-shims/src/main/java/org/apache/zeppelin/flink/shims117/CollectStreamTableSink.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.flink.shims117; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.experimental.CollectSink; +import org.apache.flink.table.sinks.RetractStreamTableSink; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.UUID; + +/** + * Table sink for collecting the results locally using sockets. + */ +public class CollectStreamTableSink implements RetractStreamTableSink { + + private static final Logger LOGGER = LoggerFactory.getLogger(CollectStreamTableSink.class); + + private final InetAddress targetAddress; + private final int targetPort; + private final TypeSerializer> serializer; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + public CollectStreamTableSink(InetAddress targetAddress, + int targetPort, + TypeSerializer> serializer) { + LOGGER.info("Use address: " + targetAddress.getHostAddress() + ":" + targetPort); + this.targetAddress = targetAddress; + this.targetPort = targetPort; + this.serializer = serializer; + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public CollectStreamTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + final CollectStreamTableSink copy = + new CollectStreamTableSink(targetAddress, targetPort, serializer); + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @Override + public TypeInformation getRecordType() { + return Types.ROW_NAMED(fieldNames, fieldTypes); + } + + @Override + public DataStreamSink consumeDataStream(DataStream> stream) { + // add sink + return stream + .addSink(new CollectSink<>(targetAddress, targetPort, serializer)) + .name("Zeppelin Flink Sql Stream Collect Sink " + UUID.randomUUID()) + .setParallelism(1); + } + + @Override + public TupleTypeInfo> getOutputType() { + return new TupleTypeInfo<>(Types.BOOLEAN, getRecordType()); + } +} diff --git a/flink/pom.xml b/flink/pom.xml index 1939c1ccb45..428ab57c890 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -40,6 +40,7 @@ flink1.14-shims flink1.15-shims flink1.16-shims + flink1.17-shims @@ -47,12 +48,21 @@ 1.14.0 1.15.1 1.16.0 + 1.17.1 2.11.12 2.11 + + flink-117 + + + flink-scala-2.12 + + + flink-116 diff --git a/testing/env_python_3_with_flink_117.yml b/testing/env_python_3_with_flink_117.yml new file mode 100644 index 00000000000..91dfb2f644e --- /dev/null +++ b/testing/env_python_3_with_flink_117.yml @@ -0,0 +1,29 @@ +name: python_3_with_flink +channels: + - conda-forge + - defaults +dependencies: + - pycodestyle + - scipy + - numpy=1.19.5 + - grpcio + - protobuf + - pandasql + - ipython + - ipython_genutils + - ipykernel + - jupyter_client=5 + - hvplot + - plotnine + - seaborn + - intake + - intake-parquet + - intake-xarray + - altair + - vega_datasets + - plotly + - jinja2=3.0.3 + - pip + - pip: + - apache-flink==1.17.1 +