In this part we look at turning a component into a service.

In Part 1, we looked at how we can easily create and test components which expect asynchronous messages[1] in and produce asynchronous messages out. However, how do we turn this into a service?

Turning our components into a service.

The key thing which is missing from our components is a transport. A lack of a transport simplifies testing, profiling and debugging, but we need to distribute our components, and for this we need a transport.

There is a wide range of possible transports available:

It is the Chronicle Queue we will be looking at in the post.

Using queue in a unit test

Chronicle Queue is persisted, however in unit tests you usually want to start fresh and remove the queue afterwards. The idiom you can use is as follows;

Creating a temporary Queue
File queuePath = new File(OS.TARGET, "testName-" + System.nanoTime());
try {
    try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) {
        // use the queue
    }

} finally {
    IOTools.shallowDeleteDirWithFiles(queuePath);
}

This creates a queue which is stored in a single file. The file is rolled daily by default and includes the current date in the path.

Writing events

If we repeat our test as before, instead of using a mock listener, we can use a listener which writes each method called to a queue:

Write methods called to a queue for either interface
OrderIdeaListener orderManager = queue.createAppender()
                                      .methodWriter(OrderIdeaListener.class, MarketDataListener.class);

Our combiner writes to this queue, as does our test:

The SidedPrice combiner
SidedMarketDataCombiner combiner = new SidedMarketDataCombiner((MarketDataListener) orderManager);

We can also repeat the events inbound. Putting all this together we get:

try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) {
    OrderIdeaListener orderManager = queue.createAppender().methodWriter(OrderIdeaListener.class, MarketDataListener.class);
    SidedMarketDataCombiner combiner = new SidedMarketDataCombiner((MarketDataListener) orderManager);

    // events in
    orderManager.onOrderIdea(new OrderIdea("EURUSD", Side.Buy, 1.1180, 2e6)); // not expected to trigger

    combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789000L, Side.Sell, 1.1172, 2e6));
    combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789100L, Side.Buy, 1.1160, 2e6));

    combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789100L, Side.Buy, 1.1167, 2e6));

    orderManager.onOrderIdea(new OrderIdea("EURUSD", Side.Buy, 1.1165, 1e6)); // expected to trigger
}

Once we have written all the events to our queue, we can process the queue in our test. A more realistic example would be to run these two components in different threads, processes or on different machines, however this just complicates the tests and the result should be the same provided the transport does it’s job.

Reading events.

When we read the events we need a component which implements the methods called above and a mock listener to ensure it triggers the right events.

Read all the events and check the right output
// what we expect to happen
OrderListener listener = createMock(OrderListener.class);
listener.onOrder(new Order("EURUSD", Side.Buy, 1.1167, 1_000_000));
replay(listener);

try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) {
    // build our scenario
    OrderManager orderManager = new OrderManager(listener); (1)
    MethodReader reader = queue.createTailer().methodReader(orderManager); (2)
    for (int i = 0; i < 5; i++)
        assertTrue(reader.readOne()); (3)

    assertFalse(reader.readOne()); (4)
    System.out.println(queue.dump()); (5)
}

verify(listener);
1 our component to test
2 our queue reader which will call the methods
3 loop to read/process one method at a time.
4 we have no more messages
5 dump the queue contents so we can see what the input was.

Finally, the test dumps the raw contents of the queue. This reads the data stored in the file that queue uses. This dump is only useful for smaller queue with a few MB of data. If you have a few GB, it won’t be able to be stored in a String. You can use DumpQueueMain f

The output of dump()
--- !!meta-data #binary
header: !SCQStore {
  wireType: !WireType BINARY,
  writePosition: 777,
  roll: !SCQSRoll {
    length: 86400000,
    format: yyyyMMdd,
    epoch: 0
    },
  indexing: !SCQSIndexing {
    indexCount: !int 8192,
    indexSpacing: 64,
    index2Index: 0,
    lastIndex: 0
    }
}
# position: 227
--- !!data #binary
onOrderIdea: {
  symbol: EURUSD,
  side: Buy,
  limitPrice: 1.118,
  quantity: 2000000.0
}
# position: 306
--- !!data #binary
onTopOfBookPrice: {
  symbol: EURUSD,
  timestamp: 123456789000,
  buyPrice: NaN,
  buyQuantity: 0,
  sellPrice: 1.1172,
  sellQuantity: 2000000.0
}
# position: 434
--- !!data #binary
onTopOfBookPrice: {
  symbol: EURUSD,
  timestamp: 123456789100,
  buyPrice: 1.116,
  buyQuantity: 2000000.0,
  sellPrice: 1.1172,
  sellQuantity: 2000000.0
}
# position: 566
--- !!data #binary
onTopOfBookPrice: {
  symbol: EURUSD,
  timestamp: 123456789100,
  buyPrice: 1.1167,
  buyQuantity: 2000000.0,
  sellPrice: 1.1172,
  sellQuantity: 2000000.0
}
# position: 698
--- !!data #binary
onOrderIdea: {
  symbol: EURUSD,
  side: Buy,
  limitPrice: 1.1165,
  quantity: 1000000.0
}
...
# 83885299 bytes remaining

To run the test and dump the queue in my IDE took 233 ms.

Conclusion

We can test components stand alone with a queue or in a chain by using more queues. More importantly we can test our components without the infrastructure complicating the debugging process. When our components work without a transport, we can show they do the same thing with a transport.

In our next part

In part 3, we will look at benchmarking and profiling with Queue. While Queue is designed to be simple and transparent, it is also designed to be faster than other persisted transports, even with no tuning.

Glossary

Idiom- "A means of expressing a recurring construct in one or more programming languages."[9]

Mock listener- Method call/messages can be sent to a mock listener. This acts as a pretend Object for the purposes of testing in order to see that would happen to a Concrete Object.

Service- A program that is available to other programs to run and make use of.

Transport- A program or hardware that takes data from ome process to another.e.g Middleware[10]


1. Beal, V. (2016). What is asynchronous messaging? Webopedia Definition. Online. Webopedia.com. Available at: http://www.webopedia.com/TERM/A/asynchronous_messaging.html. Accessed Jul. 2016
2. Netty.io. (2016). Netty: Home. Online. Available at: http://netty.io/. Accessed Mar. 2016
3. Oracle (2013). The JMS API Programming Model - The Java EE 6 Tutorial. Online. Available at: https://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html. Accessed Mar. 2016
4. Wikipedia. (2016). Java API for RESTful Web Services. Online. Available at: https://en.wikipedia.org/wiki/Java_API_for_RESTful_Web_Services. Accessed Mar. 2016
5. Docs.oracle.c(2016). Java JDBC API. Online. Available at: https://docs.oracle.com/javase/8/docs/technotes/guides/jdbc/. Accessed Mar. 2016
6. Oracle (2016). WatchService (Java Platform SE 8 ). Online. Available at: https://docs.oracle.com/javase/8/docs/api/java/nio/file/WatchService.html.Accessed 23 Mar. 2016
7. Oracle (2016). BlockingQueue (Java Platform SE 8 ). Online. Available at: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html Accessed 23 Mar. 2016
8. Apache Aries. (2016). Apache Aries - Index. Online. Available at: http://aries.apache.org/. Accessed 23 Mar. 2016
9. Wikipedia. (2016). Programming idiom. Online. Available at: https://en.wikipedia.org/wiki/Programming_idiom. Accessed Mar. 2016
10. Beal, V. (2016). What is Middleware? Webopedia Definition. Online. Webopedia.com. Available at: http://www.webopedia.com/TERM/M/middleware.html. Accessed Jul. 2016
comments powered by Disqus