Thanks! We'll be in touch in the next 12 hours
Oops! Something went wrong while submitting the form.

Building an ETL Workflow Using Apache NiFi and Hive

The objective of this article is to design an ETL workflow using Apache NiFi that will scrape a web page with almost no code to get an endpoint, extract and transform the dataset, and load the transformed data into a Hive table.

Problem Statement

One potential use case where we need to create a data pipeline would be to capture the district level COVID-19 information from the COVID19-India API website, which gets updated daily. So, the aim is to create a flow that collates and loads a dataset into a warehouse system used by various downstream applications for further analysis, and the flow should be easily configurable for future changes.

Prerequisites

Before we start, we must have a basic understanding of Apache NiFi, and having it installed on a system would be a great start for this article. If you do not have it installed, please follow these quick steps. Apache Hive should be added to this architecture, which also requires a fully functional Hadoop framework. For this article, I am using Hive on a single cluster installed locally, but you can use a remote hive connection as well.

Basic Terminologies

Apache NiFi is an ETL tool with flow-based programming that comes with a web UI built to provide an easy way (drag & drop) to handle data flow in real-time. It also supports powerful and scalable means of data routing and transformation, which can be run on a single server or in a clustered mode across many servers.

NiFi workflow consists of processors, the rectangular components that can process, verify, filter, join, split, or adjust data. They exchange pieces of information called FlowFiles through queues named connections, and the FlowFile Controller helps to manage the resources between those components.

Web scraping is a process to extract and collect structured web data with automation. It includes extracting and processing underlying HTML code using CSS selectors and the extracted data gets stored into a database.

Apache Hive is a warehouse system built on top of Hadoop used for data summarization, query, and ad-hoc analysis.

Steps for ETL Workflow

Fig:- End-to-End NiFi WorkFlow

The above flow comprises multiple processors each performing different tasks at different stages to process data. The different stages are Collect (InvokeHTTP - API Web Page, InvokeHTTP - Download District Data), Filter (GetHTMLElement, ExtractEndPoints, RouteOnAttribute - District API, QueryRecord), Transform (ReplaceHeaders, ConvertJSONToSQL), Load (PutHiveQL), and Logging (LogAttribute). Each processor is connected through different relationship connections and gets triggered on success until the data gets loaded into the table. The entire flow is scheduled to run daily.

So, let’s dig into each step to understand the flow better.

1. Get the HTML document using the Remote URL

The flow starts with an InvokeHTTP processor that sends an HTTP GET request to the COVID19-India API URL and returns an HTML page in the response queue for further inspection. The processor can be used to invoke multiple HTTP methods (GET, PUT, POST, or PATCH) as well.

Fig:- InvokeHTTP - API Web Page Configuration

2. Extract listed endpoints

The second step occurs when the GETHTMLElement processor targets HTML table rows from the response where all the endpoints are listed inside anchor tags using the CSS selector as tr > td > a. and extracts data into FlowFiles.

Fig:- GetHTMLElement Configuration

After the success of the previous step, the ExtractText processor evaluates regular expressions against the content of the FlowFile to extract the URLs, which are then assigned to a FlowFile attribute named data_url.

Fig:- ExtractEndPoints Configuration

Note: The layout of the web page may have changed in the future. So, if you are reading this article in the future, configure the above processors as per the layout changes if any.

3. Pick districts API and Download the dataset

Here, the RouteOnAttribute processor filters out an API for district-level information and ignores other APIs using Apache NiFi Expression since we are only interested in district.csv

Fig:- RouteOnAttribute - District API Configuration

And this time, the InvokeHTTP processor downloads the data using the extracted API endpoint assigned to the attribute data_url surrounded with curly braces and the response data will be in the CSV format.

Fig:- InvokeHTTP - Download District Data Configuration

4. Transform and Filter the dataset

In this stage, the header of the response data is changed to lowercase using the ReplaceText processor with Literal Replace strategy, and the first field name is changed from date to recorded_date to avoid using reserved database keywords.

Since the data is being updated daily on an incremental basis, we will only extract the data from the previous day using the QueryRecord processor. It will also convert the CSV data into JSON FlowFile using the CSVReader and JsonRecordSetWriter controller services.

Please note that both the CSVReader and JsonRecordSetWriter services can have the default settings for our use. You can check out this blog for more reading on the controller services.

And as mentioned, QueryRecord evaluates the below query to get data from the previous day out of the FlowFile and passes it to the next processor.

select * from FlowFile where recorded_date='${now():toNumber():minus(86400000):format('yyyy-MM-dd')}' 

Fig:- ReplaceHeaders Configuration

Fig:- QueryRecord Configuration

5. Establish JDBC connection pool for Hive and create a table

Let’s set up the Hive JDBC driver for the NiFi flow using HiveConnectinPool with required local/remote configurations (database connection URL, user, and password).  Hive Configuration Resources property expects Hive configuration file path, i.e., hive-site.xml.

Fig:- HiveConnectionPool Setup

Now, we need an empty table to load the data from the NiFi flow, and to do so, you can use the DDL structure below:

CODE: https://gist.github.com/velotiotech/628ace36d24285f2cf40468d6fbd7335.js

6. Load data into the Hive table

In this step, the JSON-formatted FlowFile is converted into an SQL statement using ConvertJSONToSQL to provide a SQL query as the output FlowFile. We can configure the HiveConnectinPool for the JDBC Connection Pool property along with the table name and statement type before running the processor. In this case, the statement would be an insert type since we need to load the data into the table.

Also, please note that when preparing a SQL command, the SQL Parameter Attribute Prefix property should be hiveql. Otherwise, the very next processor will not be able to identify it and will throw an error.

Then, on success, PutHiveQL executes the input SQL command and loads the data into the table. The success of this processor marks the end of the workflow and the data can be verified by fetching the target table.

Fig:- ConvertJSONToSQL Configurations


Fig:- PutHiveQL Configuration

7. Schedule the flow for daily updates

You can schedule the entire flow to run at any given time using different NiFi scheduling strategies. Since the first InvokeHTTP is the initiator of this flow, we can configure it to run daily at 2 AM.

Fig:- Scheduling Strategy

8. Log Management

Almost every processor has been directed to the LogAttribute processor with a failure/success queue, which will write the state and information of all used attributes into the NiFi file, logs/nifi-app.log. By checking this file, we can debug and fix the issue in case of any failure. To extend it even further, we can also set up a flow to capture and notify error logs using Apache Kafka over email.

9. Consume data for analysis

You can use various open-source visualization tools to start off with the exploratory data analysis on the data stored in the Hive table.

You can download the template covid_etl_workflow.xml and run it on your machine for reference.

Future Scope

There are different ways to build any workflow, and this was one of them. You can take this further by allowing multiple datasets (state_wise, test_datasets) from the list with different combinations of various processors/controllers as a part of the flow. 

You can also try scraping data from a product listing page of multiple e-commerce websites for a comparison between goods and price or you can even extract movie reviews and ratings from the IMDb website and use it as a recommendation for users. 

Conclusion

In this article, we discussed Apache NiFi and created a workflow to extract, filter, transform, and load the data for analysis purposes. If you are more comfortable building logics and want to focus on the architecture with less code, then Apache NiFi is the tool for you.

Did you like the blog? If yes, we're sure you'll also like to work with the people who write them - our best-in-class engineering team.

We're looking for talented developers who are passionate about new emerging technologies. If that's you, get in touch with us.

Explore current openings