Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
levi0090 authored Oct 20, 2023
2 parents 526718e + 9680241 commit 072f519
Show file tree
Hide file tree
Showing 30 changed files with 304 additions and 512 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ jobs:
run: |
rm -rf spark/interpreter/metastore_db
./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.4 -Pspark-scala-2.13 -Phadoop3 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS}
- name: run spark-3.5 tests with scala-2.13 and python-${{ matrix.python }}
if: matrix.python >= '3.8'
run: |
rm -rf spark/interpreter/metastore_db
./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.5 -Pspark-scala-2.13 -Phadoop3 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS}
livy-0-7-with-spark-3-4-1-under-python3:
runs-on: ubuntu-20.04
Expand Down
4 changes: 2 additions & 2 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Apache Zeppelin
Copyright 2015 - 2016 The Apache Software Foundation
Copyright 2015 - 2023 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
The Apache Software Foundation (https://www.apache.org/).

Portions of this software were developed at NFLabs, Inc. (http://www.nflabs.com)

Expand Down
3 changes: 2 additions & 1 deletion docs/setup/basics/how_to_build.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Set scala version (default 2.11). Available profiles are

To be noticed, the spark profiles here only affect the unit test (no need to specify `SPARK_HOME`) of spark interpreter.
Zeppelin doesn't require you to build with different spark to make different versions of spark work in Zeppelin.
You can run different versions of Spark in Zeppelin as long as you specify `SPARK_HOME`. Actually Zeppelin supports all the versions of Spark from 3.2 to 3.4.
You can run different versions of Spark in Zeppelin as long as you specify `SPARK_HOME`. Actually Zeppelin supports all the versions of Spark from 3.2 to 3.5.

To build with a specific Spark version or scala versions, define one or more of the following profiles and options:

Expand All @@ -106,6 +106,7 @@ Set spark major version
Available profiles are

```
-Pspark-3.5
-Pspark-3.4
-Pspark-3.3
-Pspark-3.2
Expand Down
9 changes: 1 addition & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
<flexmark.all.version>0.62.2</flexmark.all.version>
<gson.version>2.8.9</gson.version>
<gson-extras.version>0.2.2</gson-extras.version>
<jetty.version>9.4.51.v20230217</jetty.version>
<jetty.version>9.4.52.v20230823</jetty.version>
<httpcomponents.core.version>4.4.1</httpcomponents.core.version>
<httpcomponents.client.version>4.5.13</httpcomponents.client.version>
<httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version>
Expand Down Expand Up @@ -183,7 +183,6 @@
<plugin.gpg.version>1.6</plugin.gpg.version>
<plugin.jar.version>3.2.0</plugin.jar.version>
<plugin.javadoc.version>3.2.0</plugin.javadoc.version>
<plugin.jdeb.version>1.8</plugin.jdeb.version>
<plugin.lifecycle.mapping.version>1.0.0</plugin.lifecycle.mapping.version>
<plugin.protobuf.version>3.11.4</plugin.protobuf.version>
<plugin.rat.version>0.13</plugin.rat.version>
Expand Down Expand Up @@ -1669,12 +1668,6 @@
<version>${plugin.buildnumber.version}</version>
</plugin>

<plugin>
<groupId>org.vafer</groupId>
<artifactId>jdeb</artifactId>
<version>${plugin.jdeb.version}</version>
</plugin>

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class IPythonInterpreter extends JupyterKernelInterpreter {
private String additionalPythonPath;
private String additionalPythonInitFile;
private boolean useBuiltinPy4j = true;
private boolean usePy4JAuth = true;
private String py4jGatewaySecret;

public IPythonInterpreter(Properties properties) {
Expand Down Expand Up @@ -128,7 +127,7 @@ public void open() throws InterpreterException {

private void setupJVMGateway(String gatewayHost, int gatewayPort) throws IOException {
this.gatewayServer = PythonUtils.createGatewayServer(this, gatewayHost,
gatewayPort, py4jGatewaySecret, usePy4JAuth);
gatewayPort, py4jGatewaySecret);
gatewayServer.start();
}

Expand Down Expand Up @@ -199,11 +198,8 @@ protected Map<String, String> setupKernelEnv() throws IOException {
envs.put("PYTHONPATH", additionalPythonPath);
}

this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
this.py4jGatewaySecret = PythonUtils.createSecret(256);
if (usePy4JAuth) {
envs.put("PY4J_GATEWAY_SECRET", this.py4jGatewaySecret);
}
envs.put("PY4J_GATEWAY_SECRET", this.py4jGatewaySecret);
LOGGER.info("PYTHONPATH: {}", envs.get("PYTHONPATH"));
return envs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class PythonInterpreter extends Interpreter {
private ZeppelinContext zeppelinContext;
// set by PythonCondaInterpreter
private String condaPythonExec;
private boolean usePy4jAuth = false;

public PythonInterpreter(Properties property) {
super(property);
Expand Down Expand Up @@ -116,7 +115,6 @@ public void open() throws InterpreterException {
}

try {
this.usePy4jAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
createGatewayServerAndStartScript();
} catch (IOException e) {
LOGGER.error("Fail to open PythonInterpreter", e);
Expand All @@ -132,8 +130,7 @@ private void createGatewayServerAndStartScript() throws IOException {
// container can also connect to this gateway server.
String serverAddress = PythonUtils.getLocalIP(properties);
String secret = PythonUtils.createSecret(256);
this.gatewayServer = PythonUtils.createGatewayServer(this, serverAddress, port, secret,
usePy4jAuth);
this.gatewayServer = PythonUtils.createGatewayServer(this, serverAddress, port, secret);
gatewayServer.start();

// launch python process to connect to the gateway server in JVM side
Expand All @@ -149,9 +146,7 @@ private void createGatewayServerAndStartScript() throws IOException {

outputStream = new InterpreterOutputStream(LOGGER);
Map<String, String> env = setupPythonEnv();
if (usePy4jAuth) {
env.put("PY4J_GATEWAY_SECRET", secret);
}
env.put("PY4J_GATEWAY_SECRET", secret);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Launching Python Process Command: {} {}",
cmd.getExecutable(), StringUtils.join(cmd.getArguments(), " "));
Expand Down
41 changes: 11 additions & 30 deletions python/src/main/java/org/apache/zeppelin/python/PythonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.SecureRandom;
import java.util.List;
import java.util.Properties;

public class PythonUtils {
Expand All @@ -37,35 +36,17 @@ public class PythonUtils {
public static GatewayServer createGatewayServer(Object entryPoint,
String serverAddress,
int port,
String secretKey,
boolean useAuth) throws IOException {
LOGGER.info("Launching GatewayServer at " + serverAddress + ":" + port +
", useAuth: " + useAuth);
if (useAuth) {
try {
Class clz = Class.forName("py4j.GatewayServer$GatewayServerBuilder", true,
Thread.currentThread().getContextClassLoader());
Object builder = clz.getConstructor(Object.class).newInstance(entryPoint);
builder.getClass().getMethod("authToken", String.class).invoke(builder, secretKey);
builder.getClass().getMethod("javaPort", int.class).invoke(builder, port);
builder.getClass().getMethod("javaAddress", InetAddress.class).invoke(builder,
InetAddress.getByName(serverAddress));
builder.getClass()
.getMethod("callbackClient", int.class, InetAddress.class, String.class)
.invoke(builder, port, InetAddress.getByName(serverAddress), secretKey);
return (GatewayServer) builder.getClass().getMethod("build").invoke(builder);
} catch (Exception e) {
throw new IOException(e);
}
} else {
return new GatewayServer(entryPoint,
port,
GatewayServer.DEFAULT_PYTHON_PORT,
InetAddress.getByName(serverAddress),
InetAddress.getByName(serverAddress),
GatewayServer.DEFAULT_CONNECT_TIMEOUT,
GatewayServer.DEFAULT_READ_TIMEOUT,
(List) null);
String secretKey) throws IOException {
LOGGER.info("Launching GatewayServer at {}:{}", serverAddress, port);
try {
return new GatewayServer.GatewayServerBuilder(entryPoint)
.authToken(secretKey)
.javaPort(port)
.javaAddress(InetAddress.getByName(serverAddress))
.callbackClient(port, InetAddress.getByName(serverAddress), secretKey)
.build();
} catch (Exception e) {
throw new IOException(e);
}
}

Expand Down
11 changes: 1 addition & 10 deletions rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ protected int sparkVersion() {
return 20404;
}

/**
* Spark 2.4.3 need secret for socket communication between R process and jvm process.
* Sub class can override this, e.g. SparkRInterpreter
* @return
*/
protected boolean isSecretSupported() {
return true;
}

@Override
public void open() throws InterpreterException {
super.open();
Expand All @@ -98,7 +89,7 @@ public void open() throws InterpreterException {
synchronized (sparkRBackend) {
if (!sparkRBackend.isStarted()) {
try {
sparkRBackend.init(isSecretSupported());
sparkRBackend.init();
} catch (Exception e) {
throw new InterpreterException("Fail to init SparkRBackend", e);
}
Expand Down
11 changes: 1 addition & 10 deletions rlang/src/main/java/org/apache/zeppelin/r/RInterpreter.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,14 @@ protected int sparkVersion() {
return 20403;
}

/**
* Spark 2.4.3 need secret for socket communication between R process and jvm process.
* Sub class can override this, e.g. SparkRInterpreter
* @return
*/
protected boolean isSecretSupported() {
return true;
}

@Override
public void open() throws InterpreterException {
this.sparkRBackend = SparkRBackend.get();
// Share the same SparkRBackend across sessions
synchronized (sparkRBackend) {
if (!sparkRBackend.isStarted()) {
try {
sparkRBackend.init(isSecretSupported());
sparkRBackend.init();
} catch (Exception e) {
throw new InterpreterException("Fail to init SparkRBackend", e);
}
Expand Down
17 changes: 6 additions & 11 deletions rlang/src/main/java/org/apache/zeppelin/r/SparkRBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,12 @@ public void run() {
};
}

public void init(boolean isSecretSocketSupported) throws Exception {
Class rBackendClass = RBackend.class;
if (isSecretSocketSupported) {
Tuple2<Integer, Object> result =
(Tuple2<Integer, Object>) rBackendClass.getMethod("init").invoke(backend);
portNumber = result._1;
Object rAuthHelper = result._2;
secret = (String) rAuthHelper.getClass().getMethod("secret").invoke(rAuthHelper);
} else {
portNumber = (Integer) rBackendClass.getMethod("init").invoke(backend);
}
public void init() throws Exception {
Tuple2<Integer, Object> result =
(Tuple2<Integer, Object>) RBackend.class.getMethod("init").invoke(backend);
portNumber = result._1;
Object rAuthHelper = result._2;
secret = (String) rAuthHelper.getClass().getMethod("secret").invoke(rAuthHelper);
}

public void start() {
Expand Down
6 changes: 2 additions & 4 deletions rlang/src/main/java/org/apache/zeppelin/r/ZeppelinR.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,9 @@ public void open() throws IOException, InterpreterException {
cmd.addArgument(rInterpreter.sparkVersion() + "");
cmd.addArgument(timeout);
cmd.addArgument(rInterpreter.isSparkSupported() + "");
if (rInterpreter.isSecretSupported()) {
cmd.addArgument(SparkRBackend.get().socketSecret());
}
cmd.addArgument(SparkRBackend.get().socketSecret());
// dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes
LOGGER.info("R Command: " + cmd.toString());
LOGGER.info("R Command: {}", cmd);
processOutputStream = new RProcessLogOutputStream(rInterpreter);
Map env = EnvironmentUtils.getProcEnvironment();
rProcessLauncher = new RProcessLauncher(cmd, env, processOutputStream);
Expand Down
2 changes: 1 addition & 1 deletion shell/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<!--library versions -->
<pty4j.version>0.9.3</pty4j.version>
<jinjava.version>2.4.0</jinjava.version>
<guava.version>24.1.1-jre</guava.version>
<guava.version>32.0.0-jre</guava.version>
</properties>

<!-- pty4j library not in maven central repository (http://repo.maven.apache.org/maven2) -->
Expand Down
12 changes: 12 additions & 0 deletions spark/interpreter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@
</profile>

<!-- profile spark-x only affect spark version used in test -->
<profile>
<id>spark-3.5</id>
<properties>
<datanucleus.core.version>4.1.17</datanucleus.core.version>
<datanucleus.rdbms.version>4.1.19</datanucleus.rdbms.version>
<datanucleus.apijdo.version>4.2.4</datanucleus.apijdo.version>
<spark.version>3.5.0</spark.version>
<protobuf.version>3.21.12</protobuf.version>
<py4j.version>0.10.9.7</py4j.version>
</properties>
</profile>

<profile>
<id>spark-3.4</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public synchronized void open() throws InterpreterException {
getInterpreterInTheSameSessionByClassName(PySparkInterpreter.class, false);
setProperty("zeppelin.python", pySparkInterpreter.getPythonExec(sparkInterpreter.getSparkContext().conf()));

setProperty("zeppelin.py4j.useAuth",
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
// only set PYTHONPATH in embedded, local or yarn-client mode.
// yarn-cluster will setup PYTHONPATH automatically.
Expand All @@ -70,8 +68,6 @@ public synchronized void open() throws InterpreterException {
}
setUseBuiltinPy4j(false);
setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
setProperty("zeppelin.py4j.useAuth",
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
super.open();
opened = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ public void open() throws InterpreterException {
// must create spark interpreter after ClassLoader is set, otherwise the additional jars
// can not be loaded by spark repl.
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
setProperty("zeppelin.py4j.useAuth",
sparkInterpreter.getSparkVersion().isSecretSocketSupported() + "");
// create Python Process and JVM gateway
super.open();
} finally {
Expand Down
Loading

0 comments on commit 072f519

Please sign in to comment.