Fluvio’s Deduplication feature allows for the removal of duplicate records based on their keys, streamlining your data processing.
To utilize deduplication, enable it on a desired topic. Duplicates are identified and dropped within a specified window, governed by the bounds configuration.
The available bounds options are age and count, elaborated in the bounds section.
The deduplication process is deterministic and maintains its state across restarts. Upon a restart, the deduplication algorithm traverses the data stream, reconstructing the memory object accordingly.
Example topic config
Example configuration on topic:
# topic.yamlversion:0.1.0meta:name:topic-with-dedupdeduplication:bounds:count:5# remember at least 5 last recordsage:5s# remember records for at least 5 secondsfilter:transform:uses:infinyon-labs/[email protected]
A topic can be created using this config file like so:
$ fluvio topic create -c topic.yaml
After creating the topic, it can be tested like so:
$ fluvio produce topic-with-dedup --key-separator :
$ fluvio consume -B topic-with-dedup
Base number of records the filter keeps in mind. It doesn’t ensure remembering records from count records ago, but it sets a starting point.
The basic time length the filter holds onto a record. You can set it like this: 15days 2min 2s, or 2min 5s, or 15ms to specify the duration.
The deduplication task is managed by a SmartModule, and as of now, the dedup-filter is the designated SmartModule for this task.
The dedup-filter takes the data and divides it into smaller chunks, holding these chunks in memory. Each chunk is tagged with an age, indicating how old it is.
There’s a limit to the total number of records the memory can hold, set by bounds.count. When this count is reached, dedup-filter looks at the oldest chunk, checks its age against the bounds.age setting, and if it’s old enough, it’s removed. This setup allows for quick removal of old data with a minimal amount of tracking.
The approach of breaking down data into chunks does use a bit more memory, but it ensures that the filter operates smoothly, without any sudden increases in the time or memory needed.