Implementing inventory

Now that we have discussed a lot of technologies and programming approach, it is very much the time to implement some sample code. We will implement inventory keeping in our application using reactive streams. For the example, the inventory will be very simple. It is a Map<Product,InventoryItem> that holds the number of items for each product. The actual map is ConcurrentHashMap and the InventoryItem class is a bit more complex than a Long number to properly handle concurrency issues. When we design a program that is built on responsive streams, we do not need to deal with much concurrency locking, but we still should be aware that the code runs in a multithread environment and may exhibit strange behavior if we do not follow some rules.

The code for the Inventory class is fairly simple since it handles only a map:

package packt.java9.by.example.mybusiness.inventory; 

import ...;

@Component
public class Inventory {
private final Map<Product, InventoryItem> inventory =
new ConcurrentHashMap<>();

private InventoryItem getItem(Product product) {
inventory.putIfAbsent(product, new InventoryItem());
return inventory.get(product);
}

public void store(Product product, long amount) {
getItem(product).store(amount);
}

public void remove(Product product, long amount)
throws ProductIsOutOfStock {
if (getItem(product).remove(amount) != amount)
throw new ProductIsOutOfStock(product);
}
}

The inventory item maintaining the class is a bit more complex since this is the level where we handle a bit of concurrency or, at least, this is the class where we have to pay some attention:

package packt.java9.by.example.mybusiness.inventory; 

import java.util.concurrent.atomic.AtomicLong;

public class InventoryItem {
private final AtomicLong amountOnStock =
new AtomicLong(0);
void store(long n) {
amountOnStock.accumulateAndGet(n,
(stock, delta) -> stock + delta);
}
long remove(long delta) {
class ClosureData {
long actNr;
}
ClosureData d = new ClosureData();
amountOnStock.accumulateAndGet(delta,
(stock, n) ->
stock >= n ?
stock - (d.actNr = n)
:
stock - (d.actNr = 0)
);
return d.actNr;
}
}

When we add products to the inventory, we have no limit. The storage shelves are extremely huge and we do not model that they once may get full and the inventory may not be able to accommodate more items. When we want to remove items from the repository, however, we have to deal with the fact that there may not be enough items from the product. In such a case, we do not remove any items from the repository. We serve the customer to full satisfaction or we do not serve at all.

To maintain the number of the items in the inventory, we use AtomicLong. This class has the accumulateAndGet method. This method gets a Long parameter and a LongBinaryOperator that we provide in our code as a lambda. This code is invoked by the accumulateAndGet method to calculate the new value of the stock. If there are enough items, then we remove the requested number of items. If there are not enough items on stock, then we remove zero. The method returns the number of items that we actually return. Since that number is calculated inside the lambda, it has to escape from there. To do so, we use ClosureData defined inside the method.

Note that, for example, in Groovy we could simply use a Long d variable and alter the variable inside the closure. Groovy calls lambda to closures, so to say. In Java we cannot do so because the variables that we can access from inside the method should be effectively final. However, this is nothing more than a bit more explicit notation that belongs to the closure environment. The ClosureData d object is final as opposed to the field the class has, which can be modified inside the lambda.

The most interesting class that we are really interested in this chapter is InventoryKeeper. This class implements the Subscriber interface and is capable of consuming orders to maintain the inventory:

package packt.java9.by.example.mybusiness.inventory; 

import ...

public class InventoryKeeper implements Flow.Subscriber<Order> {
private static final Logger log =
LoggerFactory.getLogger(InventoryKeeper.class);
private final Inventory inventory;

public InventoryKeeper(@Autowired Inventory inventory) {
this.inventory = inventory;
}

private Flow.Subscription subscription = null;
private static final long WORKERS = 3;

@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("onSubscribe was called");
subscription.request(WORKERS);
this.subscription = subscription;
}

The onSubscribe method is invoked after the object is subscribed. The subscription is passed to the object and is also stored in a field. Since the subscriber needs this subscription in subsequent calls, when an item passed in onNext is processed and a new item is acceptable, a field is a good place to store this object in. In this method, we also set the initial request to three items. The actual value is simply demonstrative. Enterprise environments should be able to configure such parameters:

    private ExecutorService service =  
Executors.newFixedThreadPool((int) WORKERS);

The most important part of the code is the onNext method. What it does is actually goes through all the items of the order and removes the number of items from the inventory. If some of the items are out of stock, then it logs an error. This is the boring part. The interesting part is that it does this through an executor service. This is because the call to onNext should be asynchronous. The publisher calls onNext to deliver the item, but we should not make it wait for the actual processing. When the postman brings your favorite magazine, you don't start reading it immediately and make the postman wait for your signature approving acceptance. All you have to do in onNext is fetch the next order and make sure that this will be processed in due time:

    @Override 
public void onNext(Order order) {
service.submit(() -> {
int c = counter.incrementAndGet();
for (OrderItem item : order.getItems()) {
try {
inventory.remove(item.getProduct(),
item.getAmount());
} catch (ProductIsOutOfStock exception) {
log.error("Product out of stock");
}
}
subscription.request(1);
counter.decrementAndGet();
}
);
}

@Override
public void onError(Throwable throwable) {
log.info("onError was called for {}", throwable);
}

@Override
public void onComplete() {
log.info("onComplete was called");
}
}

The actual implementation in this code uses ThreadPool with three threads in it. Also, the number of required items is three. This is a logical coincidence: each thread works on a single item. It does not need to be like that, even if in most cases it is. Nothing can stop us from making more threads working on the same item if that makes sense. The opposite is also true. One single thread may be created to work on multiple items. These codes will probably be more complex and the whole idea of these complex execution models is to make the coding and the logic simpler, move the multithreading, coding, and implementation issues into the framework, and focus on the business logic in the application code. But I cannot tell that there may not be an example for a subscriber working multiple threads on multiple items, intermingled.

The last code we have to look at in this chapter is the unit test that drives the code with some examples:

    public void testInventoryRemoval() { 
Inventory inventory = new Inventory();
SubmissionPublisher<Order> p =
new SubmissionPublisher<>();

We create Publisher using the JDK class, SubmissionPublisher, which neatly implements this interface delivering multithread functionality for us without much hassle:

        p.subscribe(new InventoryKeeper(inventory));

We create an inventory keeper and we subscribe to the publisher. This does not start delivering anything because there are no publications yet, but it creates a bond between the subscriber and the publisher telling, them that whenever there is a product submitted, the subscriber wants it.

After that, we create the products and store them in the inventory, 20 pieces altogether, and we also create an order that wants 10 products to be delivered. We will execute this order many times. This is a bit of simplification, but for the test, there is no reason to create separate order objects that have the same products and the same amounts in the list of items:

        Product product = new Product(); 
inventory.store(product, 20);
OrderItem item = new OrderItem();
item.setProduct(product);
item.setAmount(10);
Order order = new Order();
List<OrderItem> items = new LinkedList<>();
items.add(item);
order.setItems(items);

After all this has been done, we submit the order to the Publisher 10 times. It means that there are 10 orders for the same product, each asking for 10 pieces, that is, 100 pieces together. Those are 100 pieces against the warehouse where we have only 20 of it. What we should expect is that only the first two orders will be fulfilled and the rest will be rejected and that is what will actually happen when we execute this code:

        for (int i = 0; i < 10; i++) 
p.submit(order);
log.info("All orders were submitted");

After all the orders are published, we wait for half a second so that the other threads have time to execute and then we finish:

        for (int j = 0; j < 10; j++) { 
log.info("Sleeping a bit...");
Thread.sleep(50);
}
p.close();
log.info("Publisher was closed");
}

Note that this is not a regular unit test file. It is some test code to play around, which I also recommend for you to execute, debug, and look at the different log outputs.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.140.68