Introducing Apache NiFi for dataflows

Apache NiFi automates dataflows by receiving data from any source, such as Twitter, Kafka, databases, and so on, and sends it to any data processing system, such as Hadoop or Spark, and then finally to data storage systems, such as HBase, Cassandra, and other databases. There can be multiple problems at these three layers, such as systems being down, or data production and consumption rates are not in sync. Apache NiFi addresses the dataflow challenges by providing the following key features:

  • Guaranteed delivery with write-ahead logs
  • Data buffering with Back Pressure and Pressure Release
  • Prioritized queuing with the oldest first, newest first, or largest first, and so on
  • Configurations for low latency, high throughput, loss tolerance, and so on
  • Data provenance records all data events for later discovery or debugging
  • Data is rolled off as it ages
  • Visual Command and Control provides dataflow visualizations and enables making changes to the existing dataflows without stopping them
  • XML-based flow templates make it easy to version control and share
  • High security with encryption such as two-way SSL
  • Designed for extension at all levels
  • Clustering for scale-out architecture

Hortonworks supports Apache NiFi in their product Hortonworks DataFlow (HDF).

Installing Apache NiFi

Hortonworks' latest sandbox VM has a pre-configured NiFi setup. The NiFi service should be added from Ambari. Go to ipaddressofsandbox:8080 and log in with admin/admin credentials. On the bottom left of the Ambari page, navigate to Actions | Add Service, check the NiFi service, then configure and deploy.

Configure the port number and other properties in the nifi.properties as needed during the configuration stage.

Log in to the NiFi UI at ipaddressofsandbox:9090/nifi.

If you want to download HDF, instructions for downloading, installing, and running the service can be found at http://docs.hortonworks.com/HDPDocuments/HDF1/HDF-1.2.0.1/bk_HDF_GettingStarted/content/ch_HDF_GettingStarted.html.

Dataflows and analytics with NiFi

There are prebuilt NiFi templates that can be downloaded and tested in your NiFi environment. Templates are available at https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates. Download the necessary templates to your machine.

In the NiFi window, you need to click on the Templates link in the upper right corner. Click on Browse, select the template, click on Open, and then import it.

Drag the template from the upper left-hand side menu to the main screen and then select the template to create the workflow from an XML document. Follow the instructions mentioned for every template in the preceding link. Right-click on the processors and verify the configurations. Click on the play button at the top to start the workflow.

More information on how to create and manage dataflows is available at https://nifi.apache.org/docs.html.

Let's create a simple dataflow in NiFi by getting data from Kafka and writing to a Spark Streaming application and HDFS. The goal of this application is to analyze Ambari logs and produce a number of INFO, WARN, and ERROR messages in a given timeframe. Perform the following commands to create a Kafka topic and start sending the Ambari agent log data to the topic to analyze in the Spark Streaming application:

cd /usr/hdp/current/kafka-broker/bin/
./kafka-topics.sh --zookeeper localhost:2181 --topic ambarilogs --create --partitions 2 --replication-factor 1
tail -f /var/log/ambari-agent/ambari-agent.log | ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic ambarilogs

Download spark-reciever and site-to-site client jars. Check the NiFi version and download compatible versions. This example downloads 0.5.1 version related jars:

mkdir /opt/spark-receiver
cd /opt/spark-receiver
wget http://central.maven.org/maven2/org/apache/nifi/nifi-site-to-site-client/0.5.1/nifi-site-to-site-client-0.5.1.jar
wget http://central.maven.org/maven2/org/apache/nifi/nifi-spark-receiver/0.5.1/nifi-spark-receiver-0.5.1.jar

In Ambari, go to Spark service configurations and add the following two properties in custom spark-defaults and then restart Spark service:

spark.driver.allowMultipleContexts true
spark.driver.extraClassPath  /opt/spark-receiver/nifi-spark-receiver-0.5.1.jar:/opt/spark-receiver/nifi-site-to-site-client-0.5.1.jar:/opt/nifi-0.5.1.1.1.2.0-32/lib/nifi-api-0.5.1.1.1.2.0-32.jar:/opt/nifi-0.5.1.1.1.2.0-32/lib/bootstrap/nifi-utils-0.5.1.1.1.2.0-32.jar:/opt/nifi-0.5.1.1.1.2.0-32/work/nar/framework/nifi-framework-nar-0.5.1.1.1.2.0-32.nar-unpacked/META-INF/bundled-dependencies/nifi-client-dto-0.5.1.1.1.2.0-32.jar

In Ambari NiFi configurations, change the following site-to-site configurations and restart the service:

nifi.remote.input.socket.host=
nifi.remote.input.socket.port=8055
nifi.remote.input.secure=false

On the NiFi UI (ipaddressofsandbox:9090/nifi), drag a processor and choose and add GetKafka processor. Right-click on the processor and click on configure. Enter the Zookeeper connection string as ipaddressofsandbox:2181 (for example, 192.168.139.167:2181) and topic as ambarilogs. Add another PutHDFS processor and configure to write to the /tmp/kafka directory and specify the configuration resources as /etc/hadoop/conf/core-site.xml and /etc/hadoop/conf/hdfs-site.xml. Set auto terminate relationships in HDFS processor. Also, drag an output port and name it as Data for Spark. Connect the processors and start the dataflow. Make sure to remove all warnings that are shown at the top of the processor.

Now, let's create a Scala Spark streaming application ambari-logs.sh with the following code to pull data from NiFi workflow:

cd /opt/spark-receiver
vi ambari-logs.sh

The code is as follows:

import org.apache.nifi._
import java.nio.charset._
import org.apache.nifi.spark._
import org.apache.nifi.remote.client._
import org.apache.spark._
import org.apache.nifi.events._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.nifi.remote._
import org.apache.nifi.remote.client._
import org.apache.nifi.remote.protocol._
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import java.io._
import org.apache.spark.serializer._
import org.apache.nifi.remote.client.SiteToSiteClient
import org.apache.nifi.spark.{NiFiDataPacket, NiFiReceiver}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkNiFiAmbari {
  def main(args: Array[String]) {
  val conf = new SiteToSiteClient.Builder().url("http://localhost:9090/nifi").portName("Data for Spark").buildConfig()
  val config = new SparkConf().setAppName("Ambari Log Analyzer")
  val ssc = new StreamingContext(config, Seconds(30))
  val packetStream: ReceiverInputDStream[NiFiDataPacket] =
      ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
      val lines: DStream[String] = packetStream.flatMap(packet => new String(packet.getContent).split("
"))
      val pairs: DStream[(String, Int)] = lines.map(line => (line.split(" ")(0), 1))
      val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _)

      wordCounts.print()

  ssc.start()
  }
}
SparkNiFiAmbari.main(Array())

Start the Spark shell by passing the program to it:

spark-shell –i ambari-logs.sh

The output will appear on the screen as follows:

-------------------------------------------
Time: 1471091460000 ms
-------------------------------------------
(INFO,46)
(WARN,4)
(ERROR,1)

Check the HDFS directory (/tmp/kafka) to see whether the data is being written from Kafka. The NiFi dataflow looks similar to Figure 6.9. Note that data buffering occurs if you stop the Spark Streaming application, and data will not be lost:

Dataflows and analytics with NiFi

Figure 6.9: The NiFi workflow

We are now ready to play with the various notebooks and dataflow tools. Happy coding!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.33.235