Setting up a Flink cluster is very simple as well. Those who have a background of installing a Hadoop cluster will be able to relate to these steps very easily. In order to set up the cluster, let's assume we have four Linux machines with us, each having a moderate configuration. At least two cores and 4 GB RAM machines would be a good option to get started.
The very first thing we need to do this is to choose the cluster design. As we have four machines, we will use one machine as the Job Manager and the other three machines as the Task Managers:
In order to set up the cluster, we first need to do password less connections to the Task Manager from the Job Manager machine. The following steps needs to be performed on the Job Manager machine which creates an SSH key and copies it to authorized_keys
:
$ssh-keygen
This will generate the public and private keys in the /home/flinkuser/.ssh
folder. Now copy the public key to the Task Manager machine and perform the following steps on the Task Manager to allow password less connection from the Job Manager:
sudo mkdir -p /home/flinkuser/.ssh sudo touch /home/flinkuser/authorized_keys sudo cp /home/flinkuser/.ssh sudo sh -c "cat id_rsa.pub >> /home/flinkuser/.ssh/authorized_keys"
Make sure the keys have restricted access by executing the following commands:
sudo chmod 700 /home/flinkuser/.ssh sudo chmod 600 /home/flinkuser/.ssh/authorized_keys
Now you can test the password less SSH connection from the Job Manager machine:
sudo ssh <task-manager-1> sudo ssh <task-manager-2> sudo ssh <task-manager-3>
If you are using any cloud service instances for the installations, please make sure that the ROOT login is enabled from SSH. In order to do this, you need to login to each machine: open file /etc/ssh/sshd_config
. Then change the value to PermitRootLogin yes
. Once you save the file, restart the SSH service by executing the command: sudo service sshd restart
Next we need to install Java on each machine. The following command will help you install Java on Redhat/CentOS based UNIX machines.
wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u92-b14/jdk-8u92- linux-x64.rpm sudo rpm -ivh jdk-8u92-linux-x64.rpm
Next we need to set up the JAVA_HOME
environment variable so that Java is available to access from everywhere.
Create a java.sh
file:
sudo vi /etc/profile.d/java.sh
And add following content in it and save it:
#!/bin/bash JAVA_HOME=/usr/java/jdk1.8.0_92 PATH=$JAVA_HOME/bin:$PATH export PATH JAVA_HOME export CLASSPATH=.
Make the file executable and source it:
sudo chmod +x /etc/profile.d/java.sh source /etc/profile.d/java.sh
You can now check if Java is installed properly:
$ java -version java version "1.8.0_92" Java(TM) SE Runtime Environment (build 1.8.0_92-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
Repeat these installations steps on the Job Manager and Task Manager machines.
Once SSH and Java installation is done, we need to download Flink binaries and extract them into a specific folder. Please make a note that the installation directory on all nodes should be same.
So let's get started:
cd /usr/local sudo wget http://www-eu.apache.org/dist/flink/flink-1.1.4/flink- 1.1.4-bin-hadoop27-scala_2.11.tgz sudo tar -xzf flink-1.1.4-bin-hadoop27-scala_2.11.tgz
Now that the binary is ready, we need to do some configurations.
Flink's configurations are simple. We need to tune a few parameters and we are all set. Most of the configurations are same for the Job Manager node and the Task Manager node. All configurations are done in the conf/flink-conf.yaml
file.
The following is a configuration file for a Job Manager node:
jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 taskmanager.heap.mb: 512 taskmanager.numberOfTaskSlots: 1
You may want to change memory configurations for the Job Manager and Task Manager based on your node configurations. For the Task Manager, jobmanager.rpc.address
should be populated with the correct Job Manager hostname or IP address.
So for all Task Managers, the configuration file should be like the following:
jobmanager.rpc.address: <jobmanager-ip-or-host> jobmanager.rpc.port: 6123 jobmanager.heap.mb: 256 taskmanager.heap.mb: 512 taskmanager.numberOfTaskSlots: 1
We need to add the JAVA_HOME
details in this file so that Flink knows exactly where to look for Java binaries:
export JAVA_HOME=/usr/java/jdk1.8.0_92
We also need to add the slave node details in the conf/slaves
file, with each node on a separate new line.
Here is how a sample conf/slaves
file should look like:
<task-manager-1> <task-manager-2> <task-manager-3>
Now the only thing left is starting the Flink processes. We can start each process separately on individual nodes or we can execute the start-cluster.sh
command to start the required processes on each node:
bin/start-cluster.sh
If all the configurations are good, then you would see that the cluster is up and running. You can check the web UI at
http://<job-manager-ip>:8081/
.
The following are some snapshots of the Flink Web UI:
You can click on the Job Manager link to get the following view:
Similarly, you can check out the Task Managers view as follows:
Flink provides you with the facility to add additional instances of Job and Task Managers to the running cluster.
Before we start the daemon, please make sure that you have followed the steps given previously.
To add an additional Job Manager to the existing cluster, execute the following command:
sudo bin/jobmanager.sh start cluster
Similarly, we need to execute the following command to add an additional Task Manager:
sudo bin/taskmanager.sh start cluster
Once the job execution is completed, you want to shut down the cluster. The following commands are used for that.
To stop the complete cluster in one go:
sudo bin/stop-cluster.sh
To stop the individual Job Manager:
sudo bin/jobmanager.sh stop cluster
To stop the individual Task Manager:
sudo bin/taskmanager.sh stop cluster
18.188.119.81