Solution
import os
import random
dataset_train_pos_path = "../Datasets/aclImdb/train/pos/"
dataset_train_neg_path = "../Datasets/aclImdb/train/neg/"
dataset_test_pos_path = "../Datasets/aclImdb/test/pos/"
dataset_test_neg_path = "../Datasets/aclImdb/test/neg/"
We have four variables: one for training_positive, one for training_negative, one for test_positive, and one for test_negative, each pointing at the respective dataset subdirectory.
contents_labels = [('this is the text from one '
'of the files', 'pos'),
('this is another text', 'pos')]
def read_dataset(dataset_path, label):
contents_labels = []
files = os.listdir(dataset_path)
for fn in files:
path = os.path.join(dataset_path, fn)
with open(path) as f:
s = f.read()
contents_labels.append((s, label))
return contents_labels
The read_dataset function takes a path to a dataset and a label (either pos or neg), reads the contents of each file in the given directory, and adds these contents into a data structure that is a list of tuples. Each tuple contains both the text of the file, and the label pos or neg.
train_pos = read_dataset(dataset_train_pos_path, "pos")
train_neg = read_dataset(dataset_train_neg_path, "neg")
test_pos = read_dataset(dataset_test_pos_path, "pos")
test_neg = read_dataset(dataset_test_neg_path, "neg")
We have four variables in total: train_pos, train_neg, test_pos, and test_neg, each one of which is a list of tuples containing the relative text and labels.
train = train_pos + train_neg
test = test_pos + test_neg
We combined our positive and negative examples into a single dataset so that we can train an algorithm to discriminate between the two classes.
random.shuffle(train)
random.shuffle(test)
This gives us datasets where the training data is mixed up, instead of feeding all the positive and then all the negative examples to the classifier in order.
train_data, y_train = zip(*train)
test_data, y_test = zip(*test)
You should have four variables again called train_data, y_train, test_data, and y_test, where the y prefix indicates that the respective array contains labels.
%%time
from sklearn.feature_extraction.text
import TfidfVectorizer
vectorizer = TfidfVectorizer()
X_train = vectorizer.fit_transform(train_data)
X_test = vectorizer.transform(test_data)
print("The dimensions of our vectors:")
print(X_train.shape)
print(«- - -»)
You should get the following output:
The dimensions of our vectors:
(25000, 74849)
- - -
CPU times: user 13.4 s, sys: 440 ms, total: 13.8 s
Wall time: 14.7 s
We import TfidfVectorizer from sklearn, initialize an instance of it, fit the vectorizer on the training data, and vectorize both the training and testing data into X_train and X_test variables respectively. Lastly, we calculate the time it takes and prints out the shape of the training vectors at the end.
%%time
from sklearn.svm import LinearSVC
svm_classifier = LinearSVC()
svm_classifier.fit(X_train, y_train)
predictions = svm_classifier.predict(X_test)
You should get the following output:
CPU times: user 799 ms, sys: 63 ms, total: 862 ms
Wall time: 1.17 s
We imported the LinearSVC classifier and trained it on our training data. Then we generated predictions for every label in our test set.
from sklearn.metrics import accuracy_score, classification_report
print("Accuracy: {} "
.format(accuracy_score(y_test, predictions)))
print(classification_report(y_test, predictions))
You should get the following output:
Now, let's see how the classifier performs when we feed it with data on different topics.
good_review = "The restaurant was really great! "
"I ate wonderful food and had "
"a very good time"
bad_review = "The restaurant was awful. "
"The staff were rude and the "
"food was horrible. I hated it"
restuarant_reviews = [good_review, bad_review]
vectors = vectorizer.transform(restuarant_reviews)
print(svm_classifier.predict(vectors))
You should get the following output:
['pos' 'neg']
Note
To access the source code for this specific section, please refer to https://packt.live/3fq2Hqg.
Solution
The layers for the solution are as follows:
There is a streaming data pipeline for events from taxis.
For scalability, the raw and historical layers should be highly scalable since new data will flood into the system periodically. The analytics layer should be scalable in use since the number of concurrent users will probably grow in time. The streaming data layer should be scalable to some extent in order to keep up with incoming event data for when the number of taxis increases. The model development environment should be scalable in various aspects: data, number of users, and performance.
The availability of the system should be high (99.9% or better) for the streaming data layer since running the new system (and therefore the business strategy) depends on the analysis of that data. Other layers can have lower availability rates since they are less mission-critical.
Retention is important in the raw and historical layers to comply with laws and regulations, in the analytics layer to retain good performance, and in the streaming data layer to limit the amount of data that is being processed at any given moment.
The locality of the data becomes important once the taxi company expands to other regions/countries.
Solution
import sys
!conda install --yes --prefix {sys.prefix}
-c conda-forge pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, size
spark = SparkSession.builder.appName("Packt").getOrCreate()
data = spark.read.csv(
'../../Datasets/netflix_titles_nov_2019.csv',
header='true')
data.show()
Note
We read the data from a relative file path. You can also copy the CSV file to the same Activity03.01 directory and use netflix_titles_nov_2019.csv as the path.
You should get the following output:
movies = data.filter((col('type') == 'TV Show') &
((col('rating') == 'TV-G') |
(col('rating') == 'TV-Y')))
movies.show()
You should get the following output:
transformed = movies.withColumn('count_lists',
size(split(movies['listed_in'], ',')))
selected = transformed.select('title', 'cast',
'rating', 'release_year', 'duration',
'count_lists', 'listed_in', 'description')
selected.show()
You should get the following output:
selected.write.csv('transformed2', header='true')
Note
Alternatively, we can add the complete code of the ETL process so far in a Python script and run it through the Terminal (macOS or Linux) or Command Prompt (Windows). We have created the same spark_etl.py Python script at the following location: https://packt.live/2ZqQxb9.
# note: the filename ('part-....') will differ
#in your local machine
!head transformed2/part-00000-a9837c96-549d-4a8c-981a-
cae147e36801-c000.csv
You should get the following output:
Note
To access the source code for this specific section, please refer to https://packt.live/3iXSinP.
Solution
import sys
!conda install --yes --prefix {sys.prefix}
-c conda-forge pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, window,
to_timestamp, explode, split, col
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder.appName('Packt').getOrCreate()
raw_stream = spark.readStream.format('socket').option(
'host', 'localhost').option('port', 1234).load()
tweet_datetime_format = 'EEE MMM dd HH:mm:ss ZZZZ yyyy'
schema = StructType([StructField('created_at',
StringType(), True),
StructField('text', StringType(), True)])
tweet_stream = raw_stream.select(from_json('value', schema).alias('tweet'))
timed_stream = tweet_stream.select(
to_timestamp('tweet.created_at', tweet_datetime_format).alias('timestamp'),
explode(
split('tweet.text', ' ')
).alias('word'))
windowed = timed_stream
.withWatermark('timestamp', '1 minute')
.groupBy(window('timestamp', '10 minutes'), 'word')
counts_per_window = windowed.count().orderBy(['window', 'count'], ascending=[0, 1])
query = counts_per_window.writeStream.outputMode('complete')
.format('console').option("truncate", False).start()
query.awaitTermination()
You should get the following output:
Note
To access the source code for this specific section, please refer to https://packt.live/3iX0ODx.
Solution
words = """sporty
nerdy
employed
unemployed
clever
stupid
latino
asian
caucasian
disabled
pregnant
introvert
extrovert
politician
florist
CEO"""
In the previous code, we added 16 words to our list.
import spacy
nlp = spacy.load('en_core_web_lg')
def polarity_good_vs_bad(word):
"""Returns a positive number if a word is closer to good than it is to bad, or a negative number if vice versa
IN: word (str): the word to compare
OUT: diff (float): positive if the word is closer to good, otherwise negative
"""
good = nlp("good")
bad = nlp("bad")
word = nlp(word)
if word and word.vector_norm:
sim_good = word.similarity(good)
sim_bad = word.similarity(bad)
diff = sim_good - sim_bad
diff = round(diff * 100, 2)
return diff
else:
return None
In the preceding code, we defined our basic sentiment classification model again. It takes in a word and checks whether it is closer to good or closer to bad.
# Guesses
"""sporty : POS
nerdy : NEG
employed : POS
unemployed : NEG
clever : POS
stupid : NEG
latino : NEG
asian : NEG
caucasian : POS
disabled : NEG
pregnant : NEG
introvert : NEG
extrovert : POS
politician : NEG
florist : POS
CEO: NEG"""
In the preceding code, we added POS (positive) or NEG (negative) next to the words in our list, guessing their polarity.
for word in words.split(" "):
print(word, polarity_good_vs_bad(word))
You should get the following output:
In the preceding code, we looped through all the words in our list and calculated their polarity. We can see that our predictions were right in all cases.
Note
To access the source code for this specific section, please refer to https://packt.live/2Zqk84p.
Solution
Windows:
mysql
Linux:
sudo mysql
macOS:
mysql
Create database PacktFashion;
use PacktFashion;
You should get the following output:
Next, we will create the tables as per the data model.
CREATE TABLE manufacturer (m_id INT,
m_name TEXT,
m_created_at TIMESTAMP,
PRIMARY KEY (m_id)
);
CREATE TABLE products (p_id INT,
p_name TEXT,
p_buy_price FLOAT,
p_manufacturer_id INT,
p_created_at TIMESTAMP,
PRIMARY KEY (p_id),
FOREIGN KEY (p_manufacturer_id)
REFERENCES manufacturer(m_id)
ON DELETE CASCADE
);
Declaring the ON DELETE CASCADE command will force MySQL to delete the data from the child table when any parent key is deleted. For example, if any m_id entry is deleted from the manufacturer table, then all the rows matching the p_manufacturer_id column will also be deleted from the products table.
CREATE TABLE sales (s_id INT,
p_id INT,
s_sale_price FLOAT,
s_profit FLOAT,
s_created_at TIMESTAMP,
PRIMARY KEY (s_id),
FOREIGN KEY (p_id)
REFERENCES products(p_id)
ON DELETE CASCADE
);
CREATE TABLE location (loc_id INT,
loc_name TEXT,
loc_created_at TIMESTAMP,
PRIMARY KEY (loc_id)
);
CREATE TABLE status (status_id INT,
status_name TEXT,
status_created_at TIMESTAMP,
PRIMARY KEY (status_id)
);
CREATE TABLE inventory (inv_id INT,
inv_loc_id INT,
inv_p_id INT,
inv_status_id INT,
inv_created_at TIMESTAMP,
PRIMARY KEY (inv_id),
FOREIGN KEY (inv_loc_id)
REFERENCES location(loc_id)
ON DELETE CASCADE,
FOREIGN KEY (inv_p_id)
REFERENCES products(p_id)
ON DELETE CASCADE,
FOREIGN KEY (inv_status_id )
REFERENCES status(status_id)
ON DELETE CASCADE
);
Now that the structure is ready, we will insert data into the tables.
INSERT INTO manufacturer(m_id, m_name, m_created_at)
VALUES
(1,"Z-1", now()),
(2,"XIMO", now()),
(3,"NY", now());
INSERT INTO products(p_id, p_name, p_buy_price, p_manufacturer_id, p_created_at)
VALUES
(1, 'Z-1 Running shoe', 34, 1, now()),
(2, 'XIMO Trek shirt', 15, 2, now()),
(3, 'XIMO Trek shorts', 18, 2, now()),
(4, 'NY cap', 18, 3, now());
INSERT INTO sales(s_id, p_id, s_sale_price, s_profit, s_created_at)
VALUES
(1,2,18,3,now()),
(2,3,20,2,now()),
(3,3,19,1,now()),
(4,1,40,6,now()),
(5,1,34,0,now());
INSERT INTO location(loc_id, loc_name, loc_created_at)
VALUES
(1, 'California', now()),
(2, 'London', now()),
(3, 'Prague', now());
INSERT INTO status(status_id, status_name, status_created_at)
VALUES
(1, 'IN', now()),
(2, 'OUT', now());
INSERT INTO inventory(inv_id,
inv_loc_id,
inv_p_id,
inv_status_id,
inv_created_at)
VALUES
(1,1,3,1,now()),
(2,3,4,1,now()),
(3,2,2,2,now()),
(4,3,2,2,now()),
(5,1,1,2,now());
SELECT * FROM manufacturer;
You should get the following output:
SELECT * FROM products;
You should get the following output:
SELECT * FROM sales;
You should get the following output:
The formula that's used for calculating profit is (sale price – buy price). In each record, we can see that the sales price is different for the same product. It is assumed that the selling prices are different based on various sale offers.
SELECT * FROM location;
You should get the following output:
SELECT * FROM status;
You should get the following output:
SELECT * FROM inventory;
You should get the following output:
This inventory table defines the product inventory where inv_p_id refers to p_id from the products table, inv_status_id refers to status_id from the status table, and inv_loc_id refers to loc_id from the location table. Let's understand this better using a third row where inv_id (inventory ID) is 3. This can be declassified as stating that the XIMO Trek Shirt product (that is, inv_p_id is 2) is not available (that is, inv_status_id is 2) in the London warehouse (inv_loc_id is 2).
SELECT count(inventory.inv_p_id) as total_in_stock_products
FROM inventory
JOIN status
ON status.status_id=inventory.inv_status_id
WHERE status.status_name='IN';
You should get the following output:
We need to join the inventory and status tables on inv_status_id and status_id and count the total products available with Status_name='IN' in the products table.
SELECT count(inventory.inv_p_id) as total_in_stock_products
FROM inventory
JOIN status
ON status.status_id=inventory.inv_status_id
WHERE status.status_name='OUT';
You should get the following output:
This scenario is similar to the previous step. In this step, we are filtering for the 'OUT' status.
SELECT status.status_name as status
FROM inventory, status, products, location
WHERE status.status_id=inventory.inv_status_id
AND products.p_id = inventory.inv_p_id
AND location.loc_id = inventory.inv_loc_id
AND products.p_name='XIMO Trek shirt'
AND location.loc_name='Prague';
You should get the following output:
We select status_name from the status table and apply conditions to other tables, that is, inventory, products, and location. Let's join on the primary key and foreign keys from the inventory, status, products, and location tables. Then, we'll filter for the location as 'Prague' and the product name as 'XIMO Trek shirt'.
Note
To access the source code for this specific section, please refer to https://packt.live/2ZpQpbz.
Solution
$mongo
use PacktFashion
You should get the following output:
switched to db PacktFashion
In MongoDB, you can start using the database directly without creating it. It will be created when you create a collection in it.
db.createCollection("products")
You should get the following output:
{ "ok" : 1 }
todayDate=new Date()
products=[{
"p_name": "XIMO Trek shirt",
"p_manufacturer": "XIMO",
"p_buy_price": 15,
"p_created_at": todayDate,
"sales": [
{
"s_sale_price": 30,
"s_profit": 15,
"p_created_at": todayDate,
},
{
"s_sale_price": 18,
"s_profit": 3,
"p_created_at": todayDate
},
{
"s_sale_price": 20,
"s_profit": 5,
"p_created_at": todayDate
},
{
"s_sale_price": 15,
"s_profit": 0,
"p_created_at": todayDate
}
]
},
{
"p_name": "XIMO Trek shorts",
"p_manufacturer": "XIMO",
"p_buy_price": 18,
"p_created_at": todayDate,
"sales": [
{
"s_sale_price": 22,
"s_profit": 4,
"p_created_at": todayDate,
},
{
"s_sale_price": 18,
"s_profit": 0,
"p_created_at": todayDate
},
{
"s_sale_price": 20,
"s_profit": 2,
"p_created_at": todayDate
}
]
},
{
"p_name": "NY cap",
"p_manufacturer": "NY",
"p_buy_price": 18,
"p_created_at": todayDate,
"sales": [
{
"s_sale_price": 20,
"s_profit": 2,
"p_created_at": todayDate,
},
{
"s_sale_price": 21,
"s_profit": 3,
"p_created_at": todayDate
},
{
"s_sale_price": 19,
"s_profit": 1,
"p_created_at": todayDate
}
]
}
]
db.products.insert(products)
You should get the following output:
WriteResult({ "nInserted" : 1 })
We have successfully inserted the product and sales data into the database. Now, we would like to track the user's actions. To do this, we will need two collections, users and user_logs, as per the data model.
users=[
{
"name":"Max",
"u_created_at":todayDate
},
{
"name":"John Doe",
"u_created_at":todayDate
},
{
"name":"Roger smith",
"u_created_at":todayDate
},
];
user_logs=[
{
user_id:"Max",
product_id:"XIMO Trek shirt",
action:"bought",
ul_crated_at:todayDate
},
{
user_id:"John Doe",
product_id:"NY cap",
action:"bought",
ul_crated_at:todayDate
},
{
user_id:"Roger smith",
product_id: "XIMO Trek shorts",
action:"bought",
ul_crated_at:todayDate
}
];
db.users.insert(users);
You should get the following output:
WriteResult({ "nInserted" : 1 })
db.user_logs.insert(user_logs);
You should get the following output:
WriteResult({ "nInserted" : 1 })
var user_logs_aggregate_pipeline = [
{ $lookup:
{
from: "users",
localField: "user_id",
foreignField: "name",
as: "users"
}
},
{
$lookup:
{
from: "products",
localField: "product_id",
foreignField: "p_name",
as: "products"
}
},
{
"$unwind":"$users"
},
{
"$project": {
"user":"$users.name",
"product":"$products.p_name",
"action":"$action"
}
}
];
We have joined three collections of products, users, and user_logs using the $lookup operator of the aggregate pipeline. Once the joining is done, we exploded the products and users arrays using the $unwind operator. You can perform various types of aggregation just to get comfortable with it before diving into more complex usage.
db.user_logs.aggregate(user_logs_aggregate_pipeline).pretty()
You should get the following output:
Note
To access the source code for this specific section, please refer to https://packt.live/38T4b9X.
Solution
Windows:
Open Cassandra CLI application
Linux:
root@ubuntu: -$ cqlsh
macOS:
MyMac:~ root$ cqlsh
You should get the following output:
cqlsh>
CREATE KEYSPACE fashionmart
WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};
use fashionmart;
CREATE COLUMNFAMILY feedback_logs(
fl_id int PRIMARY KEY,
fl_feedback text,
fl_location text,
fl_created_at timestamp);
describe tables;
You should get the following output:
feedback_logs user
BEGIN BATCH
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at)
VALUES(1, 'Great website', 'London', 3, '2019-10-30 12:05:00+0000');
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at)
VALUES(2, 'Good work', 'Seattle', 2, '2019-10-03 12:05:00+0000');
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at) VALUES(3, 'Amazing', 'Seattle', 2, '2019-11-04 11:05:00+0000');
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at) VALUES(4, 'Not so good', 'Hong Kong', 1, '2019-11-04 11:05:00+0000');
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at) VALUES(, 'Informative website', 'Shanghai', 1, '2018-11-04 11:05:00+0000');
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at) VALUES(5, 'Great website', 'London', 1, '2019-10-30 12:05:00+0000');
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at) VALUES(6, 'Good work', 'Seattle', 4, '2019-10-03 12:05:00+0000');
INSERT INTO feedback_logs(fl_id, fl_feedback, fl_location, user_id, fl_created_at) VALUES(7, 'Informative website', 'Shanghai', 2, '2018-11-04 11:05:00+0000');
APPLY BATCH;
SELECT * FROM feedback_logs;
You should get the following output:
SELECT COUNT(fl_id) AS total_feedback
FROM feedback_logs;
You should get the following output:
Note
To access the source code for this specific section, please refer to https://packt.live/2Ok942r.
Solution
spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5
You should get the following output:
By using this command, the Spark shell will be launched and we will now load the dataset from the CSV file.
val df_ses_log_csv = spark.read.options(Map("inferSchema"- >"true","delimiter"->",","header"- >"true")).csv("F:/Chapter06/Data/session_log.csv")
Note
Update the input path of the file according to your local file path throughout the exercise.
You should get the following output:
df_ses_csv: org.apache.spark.sql.DataFrame = [user_id: string, event_date: string … 3 more fields]
Here, we have loaded the dataset from the CSV file with an explicit option of a delimiter (or value separator), and the header argument set to true because the dataset contains the schema or column names. This will result in a DataFrame.
df_ses_log_csv.show(false)
Note
Owing to the size of the data, the commands in this activity can take up to a few minutes to execute.
You should get the following output:
This will display the first 20 rows from the DataFrame. The false argument represents the width of the columns shown to match the complete raw value.
df_ses_log_csv.write.format("avro").save("F:/Chapter06/Data/session_log.avro")
A file is created of the following size:
Output file size: 454 MB
This command will convert the DataFrame(that loaded the CSV file) and save it into the Avro file format. The time taken to execute this command will vary as per the size of the data. Let's convert the data to the ORC format now.
df_ses_log_csv.write.orc("F:/Chapter06/Data/session_log.orc")
A file is created of the following size:
Output file size: 182 MB
This command will convert the DataFrame(that loaded the CSV file) and save it in the ORC file format. The time taken to execute this command will vary as per the size of the data. Let's convert the data into the Parquet format now.
df_ses_log_csv.write.parquet(«F:/Chapter06/Data/session_log.parquet»)
A file is created of the following size:
[Output file size: 115 MB]
This command will convert the DataFrame(that loaded the CSV file) and save it in the Parquet file format. The time taken to execute this command will vary as per the size of the data. Now, let's measure the query performance.
var df_ses_log_parquet =
spark.read.parquet(«F:/Chapter06/Data/session_log.parquet»)
You should get the following output:
df_ses_log_parquet: org.apache.spark.sql.DataFrame =
[user_id: string, event_date: string … 3 more fields]
var df_ses_log_orc = spark.read.orc("F:/Chapter06/Data/session_log.orc")
You should get the following output:
df_ses_log_orc: org.apache.spark.sql.DataFrame = [user_id: string, event_date: string … 3 more fields]
var df_ses_log_avro = spark.read.format("avro").load("F:/Chapter06/Data/session_log.avro")
You should get the following output:
df_ses_log_avro: org.apache.spark.sql.DataFrame = [user_id: string, event_date: string … 3 more fields]
We have created three DataFrames by loading the data from our Parquet, Avro, and ORC files. Now we will create a function to measure the execution time.
// Function for time consumption
def time[A](f: => A) = {
val s = System.nanoTime
val ret = f
println("Time: "+(System.nanoTime-s)/1e6+" ms")
ret
}
In this step, we have create a time-measuring function that will take the query that needs to be executed as an input. Let's run the performance query to measure the efficiency of each file format.
time{df_ses_log_avro.groupBy("session_nb").count()}
time{df_ses_log_parquet.groupBy("session_nb").count()}
time{df_ses_log_orc.groupBy("session_nb").count()}
You should get the following output:
In this step, we have executed the count query over the DataFrames created in the previous steps. We can observe that the ORC and Parquet files have nearly the same time-efficiency over this query. Let's execute a few complex queries.
//Parquet
df_ses_log_parquet.createOrReplaceTempView(«session_log_parquet»)
//ORC
df_ses_log_orc.createOrReplaceTempView(«session_log_orc»)
//AVRO
df_ses_log_avro.createOrReplaceTempView(«session_log_avro»)
By creating a table from the DataFrames, we can execute SQL syntax queries over the same dataset. Let's now execute the queries.
// Defining sql query as a variable
val p_yr_query = "Select count(Year),Year from (Select SUBSTRING(event_date,7,10) as Year from session_log_parquet) GROUP BY Year"
val o_yr_query = "Select count(Year),Year from (Select SUBSTRING(event_date,7,10) as Year from session_log_orc) GROUP BY Year"
val a_yr_query = "Select count(Year),Year from (Select SUBSTRING(event_date,7,10) as Year from session_log_avro) GROUP BY Year"
// Executing the query inside the time function
time{spark.sql(p_yr_query)}
time{spark.sql(o_yr_query)}
time{spark.sql(a_yr_query)}
You should get the following output:
In this step, we have executed a complex GROUP BY query with a SUBSTRING function over all the DataFrames. And again, the ORC and Parquet files have nearly the same time-efficiency over this query.
We can conclude from the results that the Parquet file format has the highest compression for our dataset, and ORC has the quickest query response. However, the difference in operational efficiency does not have a high delta.
Along with technical specifications, there are a few other considerations before finalizing your choice of file format. These include scopes such as the current environment or platform, the cost and time of adopting the new environment, and the impact of such a change on the business. Let's include the technology stack of the company in our decision making.
Note
To access the source code for this specific section, please refer to https://packt.live/3gU6yMu.
Solution
from pyspark.sql.functions import desc
# File location and type
file_location = "/FileStore/tables/Film_Locations_in_San_Francisco.csv"
file_type = "csv"
# The applied options are for CSV files. For other file types, these will be ignored.
dataTable = spark.read.format(file_type)
.option("inferSchema", "true")
.option("header", "true")
.option("sep", ",")
.load(file_location)
display(dataTable)
dataTable.printSchema()
You should get the following output:
Note
As the output was long, the view is truncated for ease of representation.
We read the CSV file into a DataFrame called dataTable by using the spark.read.format() function. The function takes in several arguments, we specify the delimiter as a comma (,), specify the file path, specify the first row as the header of the DataFrame using header=true, and the data types can be inferred from the samples of the DataFrame by setting inferSchema = true.
newcolnames = ['Title','ReleaseYear', 'Locations',
'FunFacts', 'ProductionCompany',
'Distributor', 'Director', 'Writer',
'Actor1', 'Actor2', 'Actor3']
for old,new in zip(dataTable.columns, newcolnames):
dataTable = dataTable.withColumnRenamed(old,new)
display(dataTable)
You should get the following output:
Note
As the output was long, the view is truncated for ease of representation.
We remove the space in the column names by renaming them using the withColumnRenamed function. We do this by creating a list of new column names, newcolnames, and using the new list to rename the old list of columns.
#finding the recent movies released after 2015
recentMovies= dataTable.filter(dataTable.ReleaseYear >= 2015)
display(recentMovies)
You should get the following output:
Note
As the output was long, the view is truncated for ease of representation.
We use the filter function to find the list of movies released in the year 2015 or later.
#group by locations and find the count
recentLocations = recentMovies.groupby('Locations')
.count().sort(desc("count"))
display(recentLocations)
You should get the following output:
We find the popular location of recently released movies by aggregating on a writer by using the groupby and count functions. We then sort in descending order with the help of the sort(desc("count")) function.
display(recentLocations.take(3))
You should get the following output:
We find and display the top three rows from the recentLocations DataFrame by using the take function, with an argument of 3.
recentLocations.first()
display(recentLocations.take(1))
You should get the following output:
We find and display the topmost row from the recentLocations DataFrame by using the first() function.
recentWriters = recentMovies.groupby('Writer').count().sort(desc("count"))
display(recentWriters)
You should get the following output:
Note
As the output was long, the view is truncated for ease of representation.
We find the popular writers of recently released movies by aggregating on a writer by using the groupby and count functions. We then sort in descending order with the help of the sort(desc("count")) function.
display(recentWriters.take(3))
You should get the following output:
We find and display the top three rows from the recentWriters DataFrame by using the take function, with an argument of 3.
recentWriters.first()
display(recentWriters.take(1))
You should get the following output:
We find and display the topmost row from the recentWriters DataFrame by using the first() function.
Note
To access the source code for this specific section, please refer to https://packt.live/2OoDtN2.
Solution
from queue import Queue
from threading import Thread
import random
import time
We imported the modules that we will use to design our next mock system.
urls = ['url1-', 'url1-', 'url2-', 'url3-', 'url4-',
'url5-', 'url6-', 'url7-', 'url8-', 'url9-', 'url10-']
seen = set()
url_queue = Queue()
for url in urls:
url_queue.put(url)
We created 11 mock URLs and a seen set to find duplicates. We then created a queue for our URLs and added each URL to the queue.
scraped_queue = Queue()
cleaned_queue = Queue()
deduplicated_queue = Queue()
insights_queue = Queue()
decisions_queue = Queue()
We initialized Queue() objects for each component to push to when done.
def scraper():
while True:
time.sleep(random.randrange(0,2))
url = url_queue.get()
print("Scraping {}".format(url))
scraped_queue.put(url[3:])
def cleaner():
while True:
time.sleep(random.randrange(2,4))
raw = scraped_queue.get()
print("Cleaning {}".format(raw))
cleaned_queue.put(raw.replace("-", ""))
def deduplicator():
while True:
time.sleep(random.randrange(4,6))
cleaned = cleaned_queue.get()
print("Deduplicating {}".format(cleaned))
if cleaned not in seen:
deduplicated_queue.put(cleaned)
seen.add(cleaned)
We defined each of these functions in the same way we did previously.
def analyzer():
while True:
time.sleep(random.randrange(1,4))
unique = deduplicated_queue.get()
print(«Analyzing {}».format(unique))
n = int(unique)
if n % 2 == 0:
insights_queue.put(-n)
else:
insights_queue.put(n)
Here, we defined the analyzer module, which works in a similar way. It has an if statement to put either -n or n on the next queue, depending on whether the clean URL it fetches is odd or even.
def decision_maker():
while True:
time.sleep(random.randrange(1,4))
insight = insights_queue.get()
print("Deciding {}".format(insight))
if insight > 0:
decisions_queue.put("Buy {}".format(insight))
else:
decisions_queue.put("Sell {}".format(-insight))
Here, we created the decision_maker module, which is similar to the analyzer. It puts a Buy decision on the next queue for positive numbers and a Sell decision for negative ones.
def trader():
while True:
time.sleep(random.randrange(1,4))
trade = decisions_queue.get()
print("Trading {}".format(trade))
print(trade)
The trader module is the simplest module. It simply fetches items off the decision queue and prints them out.
scraper_worker = Thread(target=scraper)
cleaner_worker = Thread(target=cleaner)
deduplicator_worker = Thread(target=deduplicator)
analyzer_worker = Thread(target=analyzer)
decision_maker_worker = Thread(target=decision_maker)
trader_worker = Thread(target=trader)
Here, we created three threads, one for each component using the Thread class, and passed the respective functions in using the target parameter.
threads = [
scraper_worker, cleaner_worker, deduplicator_worker,
analyzer_worker, decision_maker_worker, trader_worker
]
[t.start() for t in threads]
You should get the following output:
We added each thread to an array and started each thread. Again, we saw the different components work at different speeds.
Note
To access the source code for this specific section, please refer to https://packt.live/3ftzYR9.
Solution
import json
import pandas as pd
# read video data
df_vids = pd.read_csv('../Data/USvideos.csv.zip', compression='zip')
# read category data
data_cats = json.load(open('../Data/US_category_id.json', 'r'))
df_cat = pd.DataFrame(data_cats)
df_cat['category'] = df_cat['items'].apply(lambda x: x['snippet']['title'])
df_cat['id'] = df_cat['items'].apply(lambda x: int(x['id']))
df_cat_drop = df_cat.drop(columns=['kind', 'etag', 'items'])
# join data
df_join = df_vids.merge(df_cat_drop, left_on='category_id',
right_on='id')
df_join.head()
You should get the following output:
To get the video data with category information, we reused a lot of the code we developed in the previous exercises. It's important to write reusable and modular code so that we can simply use it without writing code from scratch.
# aggregate likes and dislikes by category
df_agg = df_join[['category', 'likes', 'dislikes']].groupby('category').sum()
# calculate ratio
df_agg['ratio_likes_dislikes'] = df_agg['likes'] /
df_agg['dislikes']
df_agg.head()
You should get the following output:
When we perform a groupby operation, it requires an aggregation action for each group. In this case, our aggregation action is sum. We are summing the likes and dislikes for each category.
In the pandas API, df_agg['likes'] / df_agg['dislikes'] returns a pandas series, each entry of which is the likes-dislikes ratio for each category. We assign the returned pandas series to the df_agg DataFrame's new column, ['rato_likes_dislikes'].
ratio_dag.py
1 import json
2 import os
3 import shutil
4 import sys
5 from datetime import datetime
6
7 import pandas as pd
8 from airflow import DAG
9 from airflow.operators.python_operator import PythonOperator
10
11
12 def filter_data(**kwargs):
13 # read data
14 path_vids = kwargs['dag_run'].conf['path_vids']
15 date = str(kwargs['dag_run'].conf['date'])
16
17 print(os.getcwd())
18 print(path_vids)
19
20 df_vids = pd.read_csv(path_vids, compression='zip')
21 .query('trending_date==@date')
22 # cache
23 try:
24 df_vids.to_csv('./tmp/data_vids.csv', index=False)
25 except FileNotFoundError:
26 os.mkdir('./tmp')
27 df_vids.to_csv('./tmp/data_vids.csv', index=False)
The full code is available at https://packt.live/2C8bqyP.
At a glance, you will notice that this DAG script is almost the same as the top_cat_dag.py DAG script from Exercise 9.06, Creating a DAG for Our Data Pipeline Using Airflow, except that there is one more operator and its corresponding Python callable, as shown in the following code:
def calc_ratio(**kwargs):
try:
df_join = pd.read_csv('./tmp/data_joined.csv')
except Exception as e:
print('>>>>>>>>>>>> Error: {}'.format(e))
sys.exit(1)
# aggreate likes and dislikes by category
df_agg = df_join[['category', 'likes', 'dislikes']].groupby('category').sum()
# calculate ratio
df_agg['ratio_likes_dislikes'] = df_agg['likes'] / df_agg['dislikes']
df_agg.reset_index('category')
.to_csv('./tmp/data_ratio.csv',
index=False)
op4 = PythonOperator(
task_id='calc_ratio',
python_callable=calc_ratio,
dag=dag)
This is the advantage of writing reusable and modular code. You can reuse most of the functions from previous exercises. Taking advantage of what you have already built is a very common practice in software engineering. One of the most often spoken phrases in the industry is, "Don't reinvent the wheel."
cp ./ratio_dag.py ~/airflow/dags/
Note:
The Airflow webserver and scheduler should be stopped when you copy the DAG script to ~/airflow. We can stop them using Ctrl + C in the Command Prompt for Windows/Linux or Cmd + C for macOS. After the DAG script has been moved to the Airflow home directory, ~/airflow, we can launch the Airflow scheduler and webserver again. When the Airflow scheduler is launched again, it will register new DAGs from the DAG scripts in its home directory, ~/airflow. So, when we issue the airflow list_dags command, we can see new DAGs that are newly added.
airflow list_dags
You should get the following output:
The fourth entry from the bottom is our ratio_dag DAG. If your DAG is registered, you will see your DAG in the list returned by the airflow list_dags command.
airflow scheduler
You should get the following output:
Note
Launch the Airflow scheduler in the Activity09.01 directory. We are assuming that Activity09.01 is the working directory for the Airflow system.
When you launch an Airflow scheduler and it throws an error that reads attempt to write a readonly database, it means the user who launched the Airflow scheduler does not have write permission to the Airflow SQLite database file, airflow.db. One way to resolve the permission issue is to issue sudo chmod 644 ~/airflow/airflow.db in your Terminal and restart the Airflow scheduler so that the Airflow scheduler has write access to the database file.
airflow trigger_dag -c '
{
"path_vids": "../Data/USvideos.csv.zip",
"path_cats": "../Data/US_category_id.json",
"date": "17.14.11",
"path_output": "../Data/Ratio_Likes_Dislikes.csv"
}' 'ratio_dag'
You should get the following output:
The trigger_dag action will trigger the DAG to run immediately. The CLI requires the DAG ID, ratio_dag. However, there is also a -c positional argument, which is used for passing parameters into each operator in our DAG.
airflow webserver
As we can see from this dashboard, every step in the pipeline is marked with a dark green border, which means the step completed successfully. All tiles have a dark green border, which means our data pipeline successfully finished.
ls ../Data/
You should get the following output:
Ratio_Likes_Dislikes.csv US_category_id.json USvideos.csv.zip top_10_trendy_cats.csv top_10_trendy_vids.csv
If you see the first file as Ratio_Likes_Dislikes.csv, it means the pipeline is successfully running.
import pandas as pd
pd.read_csv('../Data/Ratio_Likes_Dislikes.csv ')
You should get the following output:
Note
To access the source code for this specific section, please refer to https://packt.live/32puIdZ.
Solution
import os
import json
import boto3
import shutil
import pandas as pd
# set your bucket name here
# 'ch10-data' is NOT your bucket. It's just an example here
# you should replace your bucket below
BUCKET_NAME = 'ch10-data'
# 1. download data from S3 bucket
s3_resource = boto3.resource('s3')
try:
s3_resource.Bucket(BUCKET_NAME).download_file(
'New_York_City_Leading_Causes_of_Death.csv',
'./tmp/New_York_City_Leading_Causes_of_Death.csv')
except FileNotFoundError:
os.mkdir('tmp/')
s3_resource.Bucket(BUCKET_NAME).download_file(
'New_York_City_Leading_Causes_of_Death.csv',
'./tmp/New_York_City_Leading_Causes_of_Death.csv')
# read data
df_data = pd.read_csv('tmp/New_York_City_Leading_Causes_of_Death.csv')
df_data.head()
You should get the following output:
We downloaded a file from the S3 bucket first. Once the data was downloaded into the local directory, we used pandas to read the New_York_City_Leading_Causes_of_Death.csv file. Based on your first impression of the data, you can see that the data needs some preprocessing. For example, the value at row five of the Deaths column is ., whose data type is different compared to the rest of the values in the same column. There are two approaches we can use: we can either replace . with 0 or remove the row containing the . value.
# 2. replace "." with value 0 & and convert to float type
df_data_cleaned = df_data.replace('.', 0).astype({
'Deaths': float})
# check dtypes
df_data_cleaned.dtypes
You should get the following output:
# 3. get top 3 death causes for each ethnicity
top_causes = {}
for ethnicity, df_g in df_data_cleaned.groupby([
'Race Ethnicity']):
df_top_3_causes = df_g.groupby('Leading Cause')[['Deaths']].sum().sort_values(
'Deaths', ascending=False).head(3)
top_3_causes = df_top_3_causes.index.values.tolist()
top_causes.update({ethnicity: top_3_causes})
top_causes
You should get the following output:
We group the death count based on each death cause per ethnicity. Then, we sort death causes based on their death counts and keep the top three death causes for each ethnicity.
Note
The groupby clause in pandas can be hard to comprehend if this is your first time using pandas. It might be useful if you print the output of the data when you use the groupby clause so that you can understand what groupby is doing.
with open('tmp/top_causes_per_ethnicity.json', 'w') as fout:
json.dump(top_causes, fout)
# 5. upload data to S3
s3_resource.Bucket('ch10-data').upload_file(
'tmp/top_causes_per_ethnicity.json',
'top_causes_per_ethnicity.json')
# clean up tmp
shutil.rmtree('./tmp')
ETL_pipeline.py
14 # 1. download data from S3 bucket
15 s3_resource = boto3.resource('s3')
16 try:
17 s3_resource.Bucket(BUCKET_NAME).download_file(
18 'New_York_City_Leading_Causes_of_Death.csv',
19 './tmp/New_York_City_Leading_Causes_of_Death.csv')
20 except FileNotFoundError:
21 os.mkdir('tmp/')
22 s3_resource.Bucket(BUCKET_NAME).download_file(
23 'New_York_City_Leading_Causes_of_Death.csv',
24 './tmp/New_York_City_Leading_Causes_of_Death.csv')
The full code is available at https://packt.live/2Wh3e6q.
python ETL_pipeline.py
The script will create a top_causes_per_ethnicity.json file and upload it to the cloud.
aws s3 ls s3://${BUCKET_NAME}/
You should get the following output:
Note
To access the source code for this specific section, please refer to https://packt.live/38TFoCt.
Solution
# import module
import random
import numpy as np
from itertools import count
from collections import deque
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gym
# make game
env = gym.make('CartPole-v1')
# seed the experiment
env.seed(9)
np.random.seed(9)
random.seed(9)
torch.manual_seed(9)
# define our policy
class DQN(nn.Module):
def __init__(self, observation_space, action_space):
super(DQN, self).__init__()
self.observation_space = observation_space
self.action_space = action_space
self.fc1 = nn.Linear(self.observation_space, 32)
self.fc2 = nn.Linear(32, 16)
self.fc3 = nn.Linear(16, self.action_space)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
We will use the three-layer feedforward neural network model from Exercise 11.04, Implementing a Deep Q-Learning Algorithm in PyTorch to Solve the Classic Cart Pole Problem.
act01-notebook.ipynb
# define our agent
class Agent:
def __init__(self, policy_net, target_net):
MEMORY_SIZE = 10000
GAMMA = 0.6
BATCH_SIZE = 128
EXPLORATION_MAX = 0.9
EXPLORATION_MIN = 0.05
EXPLORATION_DECAY = 0.95
# 1, 2, 3
TARGET_UPDATE = 1
self.policy_net = policy_net
self.target_net = target_net
self.target_net.load_state_dict(policy_net.state_dict())
self.target_net.eval()
self.optimizer = optim.RMSprop(
policy_net.parameters(), lr=1e-3)
self.memory = deque(maxlen=MEMORY_SIZE)
self.gamma = GAMMA
self.batch_size = BATCH_SIZE
self.exploration_rate = EXPLORATION_MAX
self.exploration_min = EXPLORATION_MIN
self.exploration_decay = EXPLORATION_DECAY
self.target_update = TARGET_UPDATE
The full code is available at https://packt.live/309071q.
Notice that we implement one more method: update_target_net(). This method is for updating the weights for the parameters in the target network. During the training time, we keep the weights in the target network frozen for a while to add stability to the expected Q value. We only update the weights for the target network every once in a while. How often we update the weights for the target network is controlled using the TARGET_UPDATE parameter. TARGET_UPDATE = 1 means we update the weights for the target network after every 1 episode.
In the experience_replay() method, we train our policy network for every timestep. Recall the following loss function:
We use a policy network to calculate the Q values (the first term in the equation) for every previous state and chosen action. We then use the target network to calculate the expected Q values (the second term in the previous equation) using the next state, and the optimal action for every timestep.
act01-notebook.ipynb
# create policy
observation_space = env.observation_space.shape[0]
action_space = env.action_space.n
policy_net = DQN(observation_space, action_space)
target_net = DQN(observation_space, action_space)
# create agent
agent = Agent(policy_net, target_net)
# play game
game_durations = []
for i_episode in count(1):
state = env.reset()
state = torch.tensor([state]).float()
print("[ episode {} ] state={}".format(i_episode, state))
for t in range(1, 10000):
action = agent.select_action(state)
state_next, reward, done, _ = env.step(action.item())
if done:
state_next = None
The full code is available at https://packt.live/309071q.
Notice that we created two deep Q neural network instances, policy_net and target_net, using the same model architecture with the DQN class to successfully create the double deep-Q learning algorithm.
After you run the training loop, you should get the following output:
Note
We will have loads of output data with 133 episode results. We have only shown the last section of it in the preceding figure.
import matplotlib.pyplot as plt
plt.style.use('ggplot')
plt.scatter(range(i_episode), game_durations)
plt.title('Game Duration Over Time')
plt.xlabel('game episode')
plt.ylabel('game duration')
plt.tight_layout()
You should get the following output:
With the double Q-learning algorithm, the algorithm can converge faster in a more consistent fashion.
Note
To access the source code for this specific section, please refer to https://packt.live/309071q.
Solution
!pip install pandas
!pip install sklearn
It should give the following output:
It will download the libraries and install them within your active Anaconda environment. There is a good chance that both frameworks are already available in your system, as part of Anaconda or from previous installations. If they are already installed, you will have the following output:
import pickle
import pandas as pd
from sklearn.linear_model import LogisticRegression
# load the datasets
train = pd.read_csv('../../Datasets/Titanic/train.csv')
test = pd.read_csv('../../Datasets/Titanic/test.csv')
train.Sex = train.Sex.map({'male':0, 'female':1})
We have now transformed the values in the Sex column to either 0 (for male) or 1 (for female).
y = train.Pclass.copy()
X = train.drop(['Pclass'], axis=1)
Since the Pclass column contains our output value on which we have to train our model (the target values), we have to extract that from the dataset. We create a new dataset for it called y and then remove the column from the training dataset. We call the new training set X.
Now, let's do some feature engineering. We can be quite certain that a lot of the columns will not hold any predictive value as to whether a person survived. For example, the name of someone and their passenger ID are interchangeable and will not contribute much to the predictive power of the machine learning model.
X.drop(['Name'], axis=1, inplace=True)
X.drop(['Embarked'], axis=1, inplace=True)
X.drop(['PassengerId'], axis=1, inplace=True)
X.drop(['Cabin'], axis=1, inplace=True)
X.drop(['Ticket'], axis=1, inplace=True)
# X.drop(['Fare'], axis=1, inplace=True)
X.Age.fillna(X.Age.mean(), inplace=True)
X.info()
# create and train a simple model
from sklearn.linear_model import LogisticRegression
model = LogisticRegression()
model.fit(X, y)
# evaluate the model
model.score(X, y)
# export the model to pickle file
file = open('model.pkl', 'wb') # write in bytes
pickle.dump(model, file)
file.close()
The pickle.dump method serializes the model to a model.pkl file in the activity directory. The file has been opened as wb, which means that it will write bytes to disk. You can check the file exists in the Activity12.01 folder.
The second part of this activity is to load the model from disk in an API and expose the API.
Note
The production.ipynb file can be found here: https://packt.live/2ZqDc2n.
!pip install flask
You'll get the following output:
If Flask is already installed, you will have the following output:
from flask import Flask, jsonify, request
import pickle
# load the model from pickle file
file = open('model.pkl', 'rb') # read bytes
model = pickle.load(file)
file.close()
We now have the same model running in our production environment (the production Jupyter Notebook) as in our model training environment (the development notebook from part 1 of this activity). To test the model, we can make a few predictions.
# get predictions from the model
print(model.predict([[1,0,36,2,0,14.67]]))
print(model.predict([[0,1,42,1,1,96.61]]))
# create an API with Flask
app = Flask('ClassPredictor')
# call this: curl -X GET http://127.0.0.1:5000/foo
@app.route('/hi', methods=['GET'])
def bar():
result = 'hello!'
return result
@app.route('/class', methods=['POST'])
def predict_class():
payload = request.get_json()
person = [payload['Survived'], payload['Sex'],
payload['Age'], payload['SibSb'],
payload['Parch'], payload['Fare']]
result = model.predict([person])
print(f'{person} -> {str(result)}')
return f'I predict that person {person} was in
class {result} of the Titanic '
app.run()
These lines define an HTTP POST method under the URL '/class'. When called, a person object is generated from the JSON payload. The person object, which is an array of input parameters for the model, is then passed to the model in the predict statement that we've seen before. The result is finally wrapped up in a string and returned to the caller.
Run the app again in the production Jupyter Notebook by executing the cell with the app.run() statement (stop it by clicking on the interrupt icon () or by typing Ctrl + C first, if needed). This time, we will test the API with a curl statement. cURL is a program that allows you to make an HTTP request across a network in a similar way to how a web browser makes requests, only the result will be just text instead of a graphical interface.
curl -X POST -H "Content-Type: application/json" -d '{"Survived": 1, "Sex": 0, "Age": 72, "SibSb": 2, "Parch": 0, "Fare": 28.35}' http://127.0.0.1:5000/class
Note
There is a Jupyter Notebook called validation in GitHub that contains the same command in a cell. It can be found here: https://packt.live/3erHRp4
The string after the -d parameter in the curl script contains a JSON object with passenger data. The fields for a person have to be named explicitly.
After running this, you'll see the result of your API call that executed the model as follows in your Terminal or Anaconda prompt:
I predict that person [1, 0, 72, 2, 0,28.35] was in class [2] of the Titanic
Note
To access the source code for this specific section, please refer to https://packt.live/38T32iF.
Solution
from flask import Flask, request
import pickle
# load the model from pickle file
file = open('model.pkl', 'rb') # read bytes
model = pickle.load(file)
file.close()
# create an API with Flask
app = Flask('Titanic')
@app.route('/class', methods=['POST'])
def predict_class():
payload = request.get_json()
person = [payload['Survived'], payload['Sex'], payload['Age'], payload['SibSb'], payload['Parch'], payload['Fare']]
result = model.predict([person])
print(f'{person} -> {str(result)}')
return f'I predict that person {person} was in passenger class {result} of the Titanic '
app.run()
python api.py
If all is OK, you should get the message that your API is running on localhost.
curl -X POST -H "Content-Type: application/json" -d '{"Survived": 0, "Sex": 1, "Age": 52, "SibSb": 1, "Parch": 0, "Fare": 82.35}' http://127.0.0.1:5000/class
Flask
sklearn
pandas
Note
It's possible to specify the exact version, for example, by entering Flask==1.1.1. To see which version you have in your development environment, enter pip freeze. To export the current list of dependencies and store them as requirements.txt, enter pip freeze > requirements.txt.
Now, let's continue with containerizing the API to make it ready to deploy to a production environment. We need to create a Docker image, which is a template for creating the actual Docker containers that will be deployed.
sudo docker run hello-world
FROM python:3.7
RUN mkdir /api
WORKDIR /api
ADD . /api/
RUN pip install -r ./requirements.txt
EXPOSE 5000
ENV PYTHONPATH="$PYTHONPATH:/api"
CMD ["python", "/api/api.py"]
sudo docker build -t titanic .
In the output, it becomes clear that all steps in our Dockerfile have been followed. The Flask library is loaded, port 5000 is exposed, and the API is running.
sudo docker image ls
It's great that we have a Docker image now, but that image has to be published to a Docker registry when we want to deploy it. Docker Hub is the central repository, but we don't want our Titanic API to end up there.
docker run -d -p 6000:5000 --restart=always --name registry registry:2
This will download the registry libraries and will run a local registry. The output is as follows:
docker tag titanic localhost:6000/titanic
docker push localhost:6000/titanic
This generates the following output:
curl -X GET http://localhost:6000/v2/titanic/tags/list
If all is well, you'll see the image with the latest tag:
{"name":"titanic","tags":["latest"]}
We have just successfully published our titanic image to that registry.
In the second part of this activity, we'll use the Docker image to host a container on a Kubernetes cluster.
minikube version
If all is OK, this will produce an output like minikube version: v1.8.1, along with a commit hashtag as follows:
minikube version: v1.8.1
commit: cbda04cf6bbe65e987ae52bb393c10099ab62014
minikube start --insecure-registry="localhost:6000"
If you're running within a virtual machine like VirtualBox, the command will be minikube start --driver=none. If all goes well, you'll get the following output:
minikube status
You'll get a status update like the following:
host: Running
kubelet: Running
apiserver: Running
kubeconfig: Configured
minikube dashboard
This will open a browser window with a lot of useful information and configuration options:
minikube kubectl
This will produce an output like the following:
You can read the kubectl controls the Kubernetes cluster manager line, which indicates that we can now use the kubectl tool to give commands to our cluster. You can get more information about the cluster with the kubectl version and kubectl get nodes commands:
eval $(minikube docker-env)
This command points our Terminal to use a different docker command, namely, the one in the Minikube environment.
docker build -t titanic .
You'll get the same output as in part 1 of this activity.
kubectl run titanic --image=titanic --image-pull-policy=Never
If this is successful, you'll see the following output:
deployment.apps/titanic created
kubectl get deployments
This will produce a list of deployed containers:
You can also check the Kubernetes dashboard, if you have started it, and check whether the deployment and Pod have been created:
kubectl port-forward titanic-6d8f58fc8b-znmx9 5000:5000
Note
In the preceding code, replace the name of the pod with your own; you can find it in the Kubernetes dashboard or by entering kubectl get pods
This will create a new port forwarding service in your Kubernetes cluster that forwards network traffic to the titanic Pod.
curl -X POST -H "Content-Type: application/json"
# -d '{"Survived": 1, "Sex": 0, "Age": 72, "SibSb": 2, "Parch": 0, "Fare": 68.35}' http://127.0.0.1:5000/class
This will produce the following output:
I predict that person [1, 0, 72, 2, 0, 68.35] was in passenger class [1] of the Titanic
Note
To access the source code for this specific section, please refer to https://packt.live/32s2PC3.
Solution
# copy the trained model for predicting the passenger
#class of a person
!cp ../Activity12.01/model.pkl .
!pip install sklearn2pmml
This will install the required library, which can export our models to the PMML format:
If sklearn2pmml is already installed, you will have the following output:
from sklearn2pmml import sklearn2pmml, make_pmml_pipeline
import pickle
# load the model from pickle file
file = open('model.pkl', 'rb') # read bytes
model = pickle.load(file)
file.close()
Now we can export the model to the PMML format.
pmml_pipeline = make_pmml_pipeline(model)
sklearn2pmml(pmml_pipeline, 'titanic_class.pmml')
! cat titanic_class.pmml
This will produce the following output:
Now let's create a streaming job. We'll use Apache Flink for this. We'll write the code with an example from IntelliJ IDEA. You can also choose another IDE. It's also possible (though not recommended) to use a plain text editor and run the code from a Terminal or command line.
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.10.0
groupId: com
artifactId: titanic_class
version: 0.0.1
package: packt
This will generate a project from a template.
mvn clean package
In the output, you'll see BUILD SUCCESS, which indicates that you now have a working Java program.
Let's start by testing the job – first, the Maven template generated code that can be deployed to a Flink cluster.
Save the file, then import the Maven changes.
import
org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.MapFunction;
import
org.apache.flink.streaming.api.datastream.
SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.
StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.
sink.filesystem.StreamingFileSink;
These lines will add the necessary Flink libraries to our class file.
DataStream<String> dataStream = env.socketTextStream(
"localhost", 1234, " ");
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("out"),
new SimpleStringEncoder<String>("UTF-8"))
.build();
dataStream.addSink(sink);
This code sets up a data stream that listens to a local socket on port 1234. It takes the lines and writes (sinks) the lines to a file in the out directory.
nc -l -p 1234
You get a prompt to enter lines. Leave it open for now.
mvn clean package
mvn exec:java -Dexec.mainClass="packt.StreamingJob"
These lines are the Maven instructions to compile the code, package it into a JAR file, and run the file with an entry point in the packt.StreamingJob class that contains our main function.
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator</artifactId>
<version>1.4.15</version>
</dependency>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator-extension</artifactId>
<version>1.4.15</version>
</dependency>
import org.dmg.pmml.FieldName;
import org.jpmml.evaluator.*;
import org.jpmml.evaluator.visitors.DefaultVisitorBattery;
import java.io.File;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
These lines import the required PMML and common Java libraries for working with PMML files.
// load PMML Evaluator evaluator= getPmmlEvaluator("titanic_class.pmml");
// get input fields List<? Extends InputField> inputFields = evaluator.getInputFields();
This code deserializes the model from PMML, loading the model in memory ready to be executed. The list of input fields will come in handy in the next step.
We are now ready to parse incoming messages as persons that can be evaluated according to whether they survived the Titanic disaster. The input values will be in a comma-separated string with the values survived, sex, age, number of siblings on board, number of parents on board, and fare paid. For example, "1,1,19,0,2,77.15" indicates a woman of 19 years old who entered the ship with her two parents and paid 77.15 for the ticket.
SingleOutputStreamOperator<String> mapped = dataStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.println("EVENT: " + s);
Map<FieldName, FieldValue> arguments = new LinkedHashMap<>();
String[] values = s.split(",");
// prepare model evaluation
for (int i = 0; i < values.length; i++) {
FieldName inputName = inputFields.get(i).getName();
FieldValue inputValue = inputFields.get(i).prepare(values[i]);
arguments.put(inputName, inputValue);
}
// execute the model
Map<FieldName, ?> results = evaluator.evaluate(arguments);
// Decoupling results from the JPMML-Evaluator runtime environment
Map<String, ?> resultRecord = EvaluatorUtil.decodeAll(results);
System.out.println(resultRecord);
return s;
}
});
This code adds a map method to the stream processing job that splits the input string and uses the resulting string values to build a set of arguments for the machine learning model.
1,1,13,1,56.91
0,0,81,0,0,120.96
1,0,41,1,1,18.11
You should get the following output:
Note
To access the source code for this specific section, please refer to https://packt.live/38SssNr.
3.144.41.229