Skip to content

Latest commit

 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Framer Recipe

There is no such a pattern in the wild, but it is based on Streaming Systems Windowing concept. Unfortunately Spring Integration Aggregator component can place one message only into a single group for correlation. The rest of an Aggregator functionality is so powerful that it is easy to configure any possible windowing aspect for finite or streaming data. The input for this component could be a finance market data to calculate trading signals.

An idea behind this recipe is to be able to distribute the same message into different groups - windows. Therefore, "framer" - a worker who knows how to manage windows. Or the one who sets boundaries for a thing when we talk about time frames.

To put the same message in the aggregator into several groups, we use a custom splitter which produces as many messages as calculated by the windowing algorithm with the same payload. This is the crucial part of this recipe - a splitter which will produce several messages with unique correlation details, but with the same data to place into different windows.

For simplicity of the proof of concept we group incoming data into batches of 3, where every next event starts a new window and contributes itself back to two previous windows, but with its respective sequence in that group. So, if our streaming data is like this: P1, P2, P3, P4, P5 …​ (P means "price"), we want to get these windows on the output:

[P1, P2, P3]
[P2, P3, P4]
[P3, P4, P5]
...

The unit test for this application emits 5 random numbers and the output may look like this:

GenericMessage [payload=[94, 107, 98], headers={sequenceNumber=2, correlationId=0, id=3f7549d0-2ad6-211e-d07d-935ee18d88d4, sequenceSize=3, timestamp=1670276902285}]
GenericMessage [payload=[107, 98, 94], headers={sequenceNumber=2, correlationId=1, id=92f1c34c-64dd-ece8-f4f2-514467af1ff9, sequenceSize=3, timestamp=1670276902286}]
GenericMessage [payload=[98, 94, 101], headers={sequenceNumber=2, correlationId=2, id=168aef5a-4a4e-15af-7d75-a0d321a5f5f0, sequenceSize=3, timestamp=1670276902286}]