Coding the microservice

We have the database of the RecommendationService microservice defined and created in our docker-compose.yml file. We will now do the same to create a container for the microservice. Again, let's edit the docker-compose.yml file.

In this code, we have a dependency, the database, and the message broker, besides some definitions of environmental variables, mainly to connect to the database and queues:

    recommendation_service:
image: recommendation_service
build: ./RecommendationService
volumes:
- './RecommendationService:/app'
environment:
- QUEUE_HOST=amqp://guest:guest@rabbitmq
- DATABASE_URL=http://recommendation_db:7474/db/data
- USER_SERVICE_ROUTE=http://172.17.0.1/user/
depends_on:
- recommendation_db
- rabbitmq
links:
- recommendation_db
- rabbitmq

Now, let's create the RecommendationService directory and files. At the end, the structure of the microservice will be as follows:

├── RecommendationService
│ ├── Dockerfile
│ ├── __init__.py
│ ├── config.yaml
│ ├── models.py
│ ├── requirements.txt
│ ├── service.py

Let's write the code of our microservice, RecommendationService. We will start by editing Dockerfile. The Dockerfile code is identical to that of the News microservice. This is due to the fact that we are using the same framework, in this case, nameko:

FROM python:3.6.1
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENTRYPOINT ["nameko"]
CMD ["run", "--config", "config.yaml", "service"]
EXPOSE 5000

The next step is to write config.yaml, which is the file responsible for telling nameko what type of settings to work with. As we will also use nameko for communication using the HTTP protocol, there are definitions such as the number of workers and in which route nameko will respond to the requests. Take a look at the following code:

AMQP_URI: 'amqp://guest:guest@rabbitmq'
WEB_SERVER_ADDRESS: '0.0.0.0:5000'
max_workers: 10
parent_calls_tracked: 10
LOGGING:
version: 1
handlers:
console:
class: logging.StreamHandler
root:
level: DEBUG
handlers: [console]

Now that we have the files responsible for creating the application instances ready, we will now create the requirements.txt file responsible for indicating what our dependencies are. This file has four dependencies. The first is pytest, which is our unit testing tool; nameko, which is the application framework; and Py2neo, which is the driver between the application and the Neo4j database. In addition, we will use requests, whose utility we'll cover in the application later. The following is the content present in the requirements.txt file:

pytest
nameko
py2neo
requests

With all the configuration files ready, let's write our models.py file. This file is slightly different from the other models.py file that we wrote in the application, due to the characteristics of the database driver that we are using. In the end, it will not be a model composed of entities, but a grouping of functions that work with the data in the database.

The first step is to write imports from the models.py file. The highlight is the import of the database. Because it is a database that uses graphs, we do not import unidentifiable types into a database, but we are importing types that create relationships in a graph model, as shown in the following code:

import os
from py2neo import (
Graph,
Node,
Relationship,
)

So, we declare constants that structure our business in code. We know that for our business, relationships will be between application users and news labels, where the type of relationship will always be a recommendation:

USERS_NODE = 'Users'
LABELS_NODE = 'Labels'
REL_TYPE = 'RECOMMENDATION'

Then, we create the connection to the database using an environment variable created in the docker-compose.yml file:

graph = Graph(os.getenv('DATABASE_URL'))

The first function of models.py is responsible for fetching the node of a user, passing the user_id as a parameter:

def get_user_node(user_id):
return graph.find_one(
USERS_NODE,
property_key='id',
property_value=user_id,
)

The second function is very similar to the first. However, it searches the node using the label parameter:

def get_label_node(label):
return graph.find_one(
LABELS_NODE,
property_key='id',
property_value=label,
)

The third function is responsible for fetching all labels that have a user's relationship. For this search, we use the user_id as the parameter. Note that before executing the search of the relationship, we must perform a search for the user node. With the user node, we can search the relationship using the labels. Take a look at the following example:

def get_labels_by_user_id(user_id):
user_node = get_user_node(user_id)
return graph.match(
start_node=user_node,
rel_type=REL_TYPE,
)

The fourth function is very similar to the third one, with the difference that we are now searching for all users related to a label:

def get_users_by_label(label):
label_node = get_label_node(label)
return graph.match(
start_node=label_node,
rel_type=REL_TYPE,
)

After writing all the functions responsible for the created queries, we will write the functions responsible for creating the data in the database.

The first function creates a user node in Neo4j if the node has not already been created in the database previously:

def create_user_node(user):
# get user info from UsersService
if not get_user_node(user['id']):
user_node = Node(
USERS_NODE,
id=user['id'],
name=user['name'],
email=user['email'],
)
graph.create(user_node)

The second creation function performs the same process as the first, but creates label nodes:

def create_label_node(label):
# get user info from UsersService
if not get_label_node(label):
label_node = Node(LABELS_NODE, id=label)
graph.create(label_node)

The third function works by creating the user/label and label/user relationship. By running the relationship process on both sides, we're allowing the search process to run on both sides; otherwise, this would not be possible:

def create_recommendation(user_id, label):
user_node = get_user_node(user_id)
label_node = get_label_node(label)
graph.create(Relationship(
label_node,
REL_TYPE,
user_node,
))
graph.create(Relationship(
user_node,
REL_TYPE,
label_node,
))

Our next step is to write the service.py file code, which works as a kind of microservice controller.

As in all other Python files in our microservice, we begin by declaring the imports. In this case, the biggest highlight is created by importing into the nameko handler. This is the first time we will use nameko for the HTTP handler:

import json
import logging
import os
import requests

from nameko.web.handlers import http
from nameko.events import event_handler

from models import (
create_user_node,
create_label_node,
create_recommendation,
get_labels_by_user_id,
get_users_by_label,
)

After we have written package imports, we will write the reader of the messages that will be in the message broker. The following code is a normal class with a receiver method, a decorator that causes the method to become a handler for the message broker. Read the code comments to understand each step of the process:

class Recommendation:

name = 'recommendation'

# declaring the receiver method as a handler to message broker
@event_handler('recommendation_sender', 'receiver')
def receiver(self, data):
try:
# getting the URL to do a sequential HTTP request to UsersService
user_service_route = os.getenv('USER_SERVICE_ROUTE')
# consuming data from UsersService using the requests lib
user = requests.get(
"{}{}".format(
user_service_route,
data['user_id'],
)
)
# serializing the UsersService data to JSON
user = user.json()
# creating user node on Neo4j
create_user_node(user)
# getting all tags read
for label in data['news']['tags']:
# creating label node on Neo4j
create_label_node(label)
# creating the recommendation on Neo4j
create_recommendation(
user['id'],
label,
)
except Exception as e:
logging.error('RELATIONSHIP_ERROR: {}'.format(e))

With the receiver ready, let's write the API code responsible for informing the recommendations registered by RecommendationService. Again, it's a Python class as decorators creating the routes for HTTP calls:

class RecommendationApi:

name = 'recommnedation_api'

The first method of the RecommendationApi class is get_recommendations_by_user, which receives user_id as a parameter. This method returns the labels that are related to the user:

@http('GET', '/user/<int:user_id>')
def get_recommendations_by_user(self, request, user_id):
"""Get recommendations by user_id"""
try:
relationship_response = get_labels_by_user_id(user_id)
http_response = [
rel.end_node()
for rel in relationship_response
]
return 200, json.dumps(http_response)
except Exception as ex:
error_response(500, ex)

The second method of the RecommendationApi class is get_users_recommendation_by_label. In this case, we are receiving the label as a parameter and we will answer all the IDs of users that are related to this label:

@http('GET', '/label/<string:label>')
def get_users_recomendations_by_label(self, request, label):
"""Get users recommendations by label"""
try:
relationship_response = get_users_by_label(label)
http_response = [
rel.end_node()
for rel in relationship_response
]
return 200, json.dumps(http_response)
except Exception as ex:
error_response(500, ex)

At the end of the file, there is a function that serves to assist the answers with some possible exceptions. Take a look at the following code:

def error_response(code, ex):
response_object = {
'status': 'fail',
'message': str(ex),
}
return code, json.dumps(response_object)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.172.195