Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ jobs:
${{ runner.os }}-zeppelin-
- name: install environment
run: |
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-117 ${MAVEN_ARGS}
./mvnw install -DskipTests -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/scala-2.12,spark/scala-2.13,markdown,flink-cmd,flink/flink-scala-2.12,jdbc,shell -am -Pweb-classic -Pflink-119 ${MAVEN_ARGS}
./mvnw package -pl zeppelin-plugins -amd -DskipTests ${MAVEN_ARGS}
- name: Setup conda environment with python 3.9 and R
uses: conda-incubator/setup-miniconda@v3
Expand All @@ -239,12 +239,7 @@ jobs:
fail-fast: false
matrix:
python: [ 3.9 ]
flink: [116, 117]
include:
# Flink 1.15 supports Python 3.6, 3.7, and 3.8
# https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/
- python: 3.8
flink: 115
flink: [119, 120]
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
15 changes: 7 additions & 8 deletions docs/interpreter/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ limitations under the License.
[Apache Flink](https://flink.apache.org) is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.15+ is supported, old versions of flink won't work.**
In Zeppelin 0.9, we refactor the Flink interpreter in Zeppelin to support the latest version of Flink. **Currently, only Flink 1.19+ is supported, old versions of flink won't work.**
Apache Flink is supported in Zeppelin with the Flink interpreter group which consists of the five interpreters listed below.

<table class="table-configuration">
Expand Down Expand Up @@ -138,16 +138,15 @@ docker run -u $(id -u) -p 8080:8080 --rm -v /mnt/disk1/flink-sql-cookbook-on-zep

## Prerequisites

Download Flink 1.15 or afterwards (Only Scala 2.12 is supported)
Download Flink 1.19 or afterwards (Only Scala 2.12 is supported)

### Version-specific notes for Flink

Flink 1.15 is scala free and has changed its binary distribution, the following extra steps is required.
* Move FLINK_HOME/opt/flink-table-planner_2.12-1.15.0.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-1.15.0.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-1.15.0.jar and flink-table-api-scala_2.12-1.15.0.jar to FLINK_HOME/lib

Flink 1.16 introduces new `ClientResourceManager` for sql client, you need to move `FLINK_HOME/opt/flink-sql-client-1.16.0.jar` to `FLINK_HOME/lib`
Flink 1.19+ is scala free and has changed its binary distribution, the following extra steps are required. Replace `${FLINK_VERSION}` below with the version of Flink you installed.
* Move FLINK_HOME/opt/flink-table-planner_2.12-${FLINK_VERSION}.jar to FLINK_HOME/lib
* Move FLINK_HOME/lib/flink-table-planner-loader-${FLINK_VERSION}.jar to FLINK_HOME/opt
* Download flink-table-api-scala-bridge_2.12-${FLINK_VERSION}.jar and flink-table-api-scala_2.12-${FLINK_VERSION}.jar to FLINK_HOME/lib
* Move FLINK_HOME/opt/flink-sql-client-${FLINK_VERSION}.jar to FLINK_HOME/lib

## Flink on Zeppelin Architecture

Expand Down
20 changes: 10 additions & 10 deletions docs/setup/deployment/flink_and_spark_cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ cd zeppelin
Package Zeppelin.

```bash
./mvnw clean package -DskipTests -Pspark-3.5 -Pflink-1.17
./mvnw clean package -DskipTests -Pspark-3.5 -Pflink-119
```

`-DskipTests` skips build tests- you're not developing (yet), so you don't need to do tests, the clone version *should* build.

`-Pspark-3.5` tells maven to build a Zeppelin with Spark 3.5. This is important because Zeppelin has its own Spark interpreter and the versions must be the same.

`-Pflink-1.17` tells maven to build a Zeppelin with Flink 1.17.
`-Pflink-119` tells maven to build a Zeppelin with Flink 1.19.

**Note:** You can build against any version of Spark that has a Zeppelin build profile available. The key is to make sure you check out the matching version of Spark to build. At the time of this writing, Spark 3.5 was the most recent Spark version available.

Expand Down Expand Up @@ -215,16 +215,16 @@ Building from source is recommended where possible, for simplicity in this tuto
To download the Flink Binary use `wget`

```bash
wget -O flink-1.17.1-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz?action=download"
tar -xzvf flink-1.17.1-bin-scala_2.12.tgz
wget -O flink-1.19.3-bin-scala_2.12.tgz "https://www.apache.org/dyn/closer.lua/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz?action=download"
tar -xzvf flink-1.19.3-bin-scala_2.12.tgz
```

This will download Flink 1.17.1.
This will download Flink 1.19.3.

Start the Flink Cluster.

```bash
flink-1.17.1/bin/start-cluster.sh
flink-1.19.3/bin/start-cluster.sh
```

###### Building From source
Expand All @@ -233,13 +233,13 @@ If you wish to build Flink from source, the following will be instructive. Note

See the [Flink Installation guide](https://github.com/apache/flink/blob/master/README.md) for more detailed instructions.

Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.17.1, and build.
Return to the directory where you have been downloading, this tutorial assumes that is `$HOME`. Clone Flink, check out release-1.19.3, and build.

```bash
cd $HOME
git clone https://github.com/apache/flink.git
cd flink
git checkout release-1.17.1
git checkout release-1.19.3
mvn clean install -DskipTests
```

Expand All @@ -261,8 +261,8 @@ If no task managers are present, restart the Flink cluster with the following co
(if binaries)

```bash
flink-1.17.1/bin/stop-cluster.sh
flink-1.17.1/bin/start-cluster.sh
flink-1.19.3/bin/stop-cluster.sh
flink-1.19.3/bin/start-cluster.sh
```


Expand Down
4 changes: 1 addition & 3 deletions flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ Flink interpreter is more complex than other interpreter (such as jdbc, shell).
Currently, it has the following modules clustered into two groups:

* flink-shims
* flink1.15-shims
* flink1.16-shims
* flink1.17-shims
* flink1.19-shims (shared by Flink 1.19 and 1.20)

* flink-scala-2.12

Expand Down
57 changes: 8 additions & 49 deletions flink/flink-scala-2.12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

<properties>
<!--library versions-->
<flink.version>${flink1.17.version}</flink.version>
<flink.version>${flink1.19.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
<flink.scala.compile.version>${flink.scala.version}</flink.scala.compile.version>
Expand All @@ -55,19 +55,7 @@

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.15-shims</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.16-shims</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>flink1.17-shims</artifactId>
<artifactId>flink1.19-shims</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down Expand Up @@ -1203,39 +1191,10 @@

<profiles>
<profile>
<id>flink-115</id>
<properties>
<flink.version>${flink1.15.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>

<profile>
<id>flink-116</id>
<id>flink-119</id>
<properties>
<flink.version>${flink1.16.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.version>${flink1.19.version}</flink.version>
<flink.scala.version>2.12.18</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
Expand Down Expand Up @@ -1267,10 +1226,10 @@
</profile>

<profile>
<id>flink-117</id>
<id>flink-120</id>
<properties>
<flink.version>${flink1.17.version}</flink.version>
<flink.scala.version>2.12.7</flink.scala.version>
<flink.version>${flink1.20.version}</flink.version>
<flink.scala.version>2.12.18</flink.scala.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.zeppelin.flink;

import org.apache.zeppelin.flink.sql.AbstractStreamSqlJob;
import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
Expand Down Expand Up @@ -53,48 +54,45 @@ public void open() throws InterpreterException {
public void callInnerSelect(String sql) {
InterpreterContext context = InterpreterContext.get();
String streamType = context.getLocalProperties().getOrDefault("type", "update");
AbstractStreamSqlJob streamJob;
if (streamType.equalsIgnoreCase("single")) {
SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
streamJob = new SingleRowStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getJavaStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run single type stream job", e);
}
} else if (streamType.equalsIgnoreCase("append")) {
AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
streamJob = new AppendStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run append type stream job", e);
}
} else if (streamType.equalsIgnoreCase("update")) {
UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
streamJob = new UpdateStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
flinkInterpreter.getStreamTableEnvironment(),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run update type stream job", e);
}
} else {
throw new RuntimeException("Unrecognized stream type: " + streamType);
}

FlinkZeppelinContext z =
(FlinkZeppelinContext) flinkInterpreter.getZeppelinContext();
z.setCurrentStreamJob(streamJob);
try {
streamJob.run(sql);
} catch (IOException e) {
throw new RuntimeException("Fail to run " + streamType + " type stream job", e);
} finally {
z.clearCurrentStreamJob();
}
}

@Override
Expand Down
Loading
Loading