Deduplicate Stage
Overview
Deduplicate
is an Akka Streams GraphStage
to drop identical (consecutive or non-consecutive) elements in a stream.
Dependency
Add the following dependency to your build.sbt
or scala build file:
"org.squbs" %% "squbs-ext" % squbsVersion
Usage
The usage is very similar to standard Akka Stream stages:
val result = Source("a" :: "b" :: "b" :: "c" :: "a" :: "a" :: "a" :: "c" :: Nil).
via(Deduplicate()).
runWith(Sink.seq)
// Output: ("a" :: "b" :: "c" :: Nil)
Deduplicate
keeps a registry of already seen elements. To prevent the registry growing unboundedly, it allows to specify the number of duplicates for each message. Once duplicateCount
is reached that element is removed from the registry. In the following example, duplicateCount
is specified as 2
, so, "a"
will not be dropped when seen the third time:
val result = Source("a" :: "b" :: "b" :: "c" :: "a" :: "a" :: "a" :: "c" :: Nil).
via(Deduplicate(2)).
runWith(Sink.seq)
// Output: ("a" :: "b" :: "c" :: "a" :: Nil)
Please note, duplicateCount
prevents registry from ever growing when the number of duplicates are known. However, there is still the potential of memory leaks. For instance, if duplicateCount
is set to 2
, an element will be kept in the registry until the duplicate is seen; however, there might be scenarios where duplicate never shows up, e.g., a filter
or drop
is used. So, be aware of consequences of Duplicate
in your use case.
You can also provide a different registry implementation to Deduplicate
that cleans itself periodically. But, you should do this only if you are certain that a duplicate would not be seen after a given time frame; otherwise, the duplication logic might be corrupted.
Configuring registry key and registry implementation
Deduplicate
uses the element itself as the key to the registry by default. However, it also accepts a function to map to a key from the element. For instance, if the elements in the stream are tuples of type (Int, String)
and you would like to identify duplicates only based on the first field of the tuple, you can pass a function as follows:
val deduplicate = Deduplicate((element: (Int, String)) => element._1, 2)
Deduplicate
also allows the registry to replaced with a different implementation of type java.util.Map[Key, MutableLong]
.
val deduplicate = Deduplicate(2, new util.TreeMap[String, MutableLong]())