Streaming events

Apache Ignite integrates with major stream providers, including the following:

  • IgniteDataStreamer
  • JMS Streamer
  • Kafka Streamer
  • Camel Streamer
  • MQTT Streamer
  • Storm Streamer
  • Flink Streamer
  • Flume sink
  • Twitter
  • RocketMQ

In this section, we are going to understand data streaming using a data streamer and some of the major stream providers.

The addData (Object, Object) method of IgniteDataStreamer adds data for streaming on a remote node. IgniteDataStreamer is responsible for streaming external data to a cache. The streamer doesn't send the streamed data to in-memory data grids immediately, instead buffering internally for better performance and network utilization. The perNodeBufferSize (int) method allows us to pre-configure the node buffer size. Once the buffer size limit is exceeded, the data is sent to remote nodes. The default buffer size is 512.

When data streaming works faster than putting objects into the cache, the streamer sends the buffered data to remote nodes in multiple parallel threads and creates uneven memory utilization in the cluster. The perNodeParallelOperations setting limits the maximum number of permitted parallel buffering operations on remote nodes. When the limit is exceeded, then the addData (Object, Object) method is blocked to control uneven memory utilization.

The allowOverwrite (Boolean) method is a flag to overwrite existing cache values; by default, the flag is false. The data streamer works faster when the flag is disabled or false, since then it doesn't have to keep track of the data versioning.

In the following section, we are going to simulate a stock data streaming scenario. Alpha vantage offers a free API to gather stock data. Register and claim your free API key and then download OHLC (Open-High-Low-Close) data for a symbol from https://www.alphavantage.co/documentation/.

The following are the steps required to stream data and detect patterns:

  • Put the downloaded csv file directly under the service-gridsrcmainjava folder.
  • Add a new Java class, StockStatus, to represent the OHLC of a stock symbol:
      public class StockStatus implements Serializable {
private static final long serialVersionUID = 1L;
@QuerySqlField(index = true)
private String symbol;
@QuerySqlField(index = true)
private Date timestamp;
@QuerySqlField(index = true)
private double open;
@QuerySqlField(index = true)
private double high;
@QuerySqlField(index = true)
private double low;
@QuerySqlField(index = true)
private double close;
@QuerySqlField(index = true)
private long volume;
      public String getSymbol() {
return symbol;
}
      public void setSymbol(String symbol) {
this.symbol = symbol;
}
      public Date getTimestamp() {
return timestamp;
}
      public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
      public double getOpen() {
return open;
}
      public void setOpen(double open) {
this.open = open;
}
      public double getHigh() {
return high;
}
      public void setHigh(double high) {
this.high = high;
}
      public double getLow() {
return low;
}
      public void setLow(double low) {
this.low = low;
}
      public double getClose() {
return close;
}
      public void setClose(double close) {
this.close = close;
}
      public long getVolume() {
return volume;
}
      public void setVolume(long volume) {
this.volume = volume;
}
      @Override
public String toString() {
return "Stock [symbol=" + symbol + ", timestamp=" + timestamp + ", open=" + open
+ ", high=" + high + ", low="
+ low + ", close=" + close + ", volume=" + volume + "]";
}
}
  • Create a data streamer to continuously read the downloaded OHLC data for a stock symbol and pump the data to a cluster node. Create a cache configuration and set an expiration policy of five seconds. We want to keep only recent data. Then, create a data streamer for the OHLC_CACHE by calling the ignite.dataStreamer (OHLC_CACHE) API, and then, in an infinite loop, read the OHLC data, create an OHLC object, and stream it to the cache by calling mktStmr.addData(ohlc.getSymbol(), ohlc):
      public class DataStreamerTest {
public static final String OHLC_CACHE = "ohlc_status";
static DateFormat format= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

public static void main(String[] args) throws IOException, URISyntaxException,
Exception {
URI uri = new DataStreamerTest().getClass().getClassLoader().getResource

("intraday_1min_AAPL.csv").toURI();

IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setPeerClassLoadingEnabled(true);

CacheConfiguration<String, StockStatus> config = new CacheConfiguration<>();
config.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new
CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 5))));
config.setName(OHLC_CACHE);
config.setIndexedTypes(String.class, StockStatus.class);
cfg.setCacheConfiguration(config);

try (Ignite ignite = Ignition.start(cfg)) {
IgniteCache<String, StockStatus> marketDataCache =
ignite.getOrCreateCache(config);
try (IgniteDataStreamer<String, StockStatus> mktStmr =
ignite.dataStreamer(OHLC_CACHE)) {
mktStmr.allowOverwrite(true);
mktStmr.perNodeParallelOperations(2);

while (true) {
try (Stream<String> stream = Files.lines(Paths.get(uri))) {
stream.forEach(str -> {
StringTokenizer st = new StringTokenizer(str, ",");
StockStatus ohlc = new StockStatus();
try {
ohlc.setTimestamp(format.parse(st.nextToken()));
} catch (ParseException e) {
e.printStackTrace();
}
ohlc.setOpen(Double.parseDouble(st.nextToken()));
ohlc.setHigh(Double.parseDouble(st.nextToken()));
ohlc.setLow(Double.parseDouble(st.nextToken()));
ohlc.setClose(Double.parseDouble(st.nextToken()));
ohlc.setVolume(Long.parseLong(st.nextToken()));
ohlc.setSymbol("AAPL");
mktStmr.addData(ohlc.getSymbol(), ohlc);
System.out.println(String.format("Adding for %s",
ohlc));
});
}
}
}
}
}
}
  • When we run the program, it continuously prints the following log:

  • Our OHLC cache is keeping the final five seconds of data, so let's query the cache to get the maximum value of AAPL stock during the last 5 secs. The following code snippet queries the OHLC cache to find the maximum (high) value of AAPL stock, waits for a second, and again queries the maximum value again in an infinite loop:
      public class DataStreamerQuery {
public static void main(String[] args) throws IOException, URISyntaxException,
Exception {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setPeerClassLoadingEnabled(true);
cfg.setClientMode(true);

try (Ignite ignite = Ignition.start(cfg)) {
IgniteCache<String, StockStatus> marketDataCache = Ignition.ignite().getOrCreateCache(OHLC_CACHE);
System.out.println(marketDataCache.get("AAPL"));

SqlFieldsQuery query = new SqlFieldsQuery("SELECT max(s.high) from
StockStatus s");
while (true) {
List<List<?>> res = marketDataCache.query(query).getAll();
System.out.println("max for last 5 sec");
System.out.println(res);
Thread.sleep(1000);
}
}
}
}
  • When we run the program, it continuously prints the maximum (high) value. We can send an alert to the team responsible when the value reaches a limit:
    • We can configure other types of streamers to stream events to an Ignite cluster and then write distributed SQL query to find a pattern and finally take action, such as buy more AAPL stocks or sell them all.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.233.43