Aggregating many different types sensors into a single data source (e.g. syslog) and ingesting that aggregate sensor into Metron is a common pattern. It is not obvious precisely how to manage these types of aggregate sensors as they require two-pass parsing. This document will walk through an example of supporting this kind of multi-pass ingest.
Multi-pass parser involves the following requirements:
At a high level, we continue to maintain the architectural invariant of a 1-1 relationship between logical sensors and storm topologies. Eventually this relationship may become more complex, but at the moment the approach is to construct a routing parser which will have two responsibilities:
Because the data emitted from the routing parser is just like any data emitted from any other parser, in that it is a JSON blob like any data emitted from any parser, we will need to adjust the downstream parsers to extract the enveloped data from the JSON blob and treat it as the data to parse.
Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
Say, there were three sensors (bro, snort and yaf). Instead of creating a topology per sensor, all 3 can be run in a single aggregated parser. It is also possible to aggregate a subset of these parsers (e.g. run bro as it’s own topology, and aggregate the other 2).
The step to start an aggregated parsers then becomes
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s bro,snort,yaf
which will result in a single storm topology named bro__snort__yaf to run.
Aggregated parsers can be specified using the Ambari Metron config as well under Services -> Metron -> Configs -> ‘Parsers’ tab -> ‘Metron Parsers’ field. The grouping is configured by enclosing the desired parsers in double quotes.
Some examples of specifying aggregated parsers are as follows:
Currently the approach to fulfill this requirement involves a couple knobs in the Parser infrastructure for Metron.
Consider the case, for instance, where we have many different TYPES of messages wrapped inside of syslog. As an architectural abstraction, we would want to have the following properties:
Parsers allow users to configure the topic which the kafka producer uses in a couple of ways (from the parser config in an individual parser):
The kafka.topicField parameter allows for data dependent topic selection and this inherrently enables the routing capabilities necessary for handling enveloped data.
Before we continue, let’s briefly talk about metadata. We have exposed the ability to pass along metadata and interact with metadata in a decoupled way from the actual parser logic (i.e. the GrokParser should not have to consider how to interpret metadata).
There are three choices about manipulating metadata in Metron:
This enables users to specify metadata independent of the data that is persisted downstream and can inform the operations of enrichment and the profiler.
Now that we have an approach which enables the routing of the data, the remaining question is how to decouple parsing data from interpreting data and metadata. By default, Metron operates like so:
Beyond that, we presume defaults for this default strategy around handling metadata. In particular, by default we do not merge metadata and use a metron.metadata prefix for all metadata.
In order to enable chained parser WITH metadata, we allow the following to be specified via strategy in the parser config:
The available strategies, specified by the rawMessageStrategy configuration is eitherENVELOPE or DEFAULT.
Specifically, to enable parsing enveloped data (i.e. data in a field of a JSON blob with the other fields being metadata), one can specify the strategy and configuration of that strategy in the parser config. One must specify the rawMessageStrategy as ENVELOPE in the parser and the rawMessageStrategyConfig to indicate the field which contains the data.
Together with routing, we have the complete solution to chain parsers which can:
Together this enables a directed acyclic graph of parsers to handle single or multi-layer parsing.
For a complete example, look at the parser chaining use-case, however for a simple example the following should suffice.
If I want to configure a CSV parser to parse data which has 3 columns f1, f2 and f3 and is held in a field called payload inside of a JSON Map, I can do so like this:
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"sensorTopic" : "my_topic" ,"rawMessageStrategy" : "ENVELOPE" ,"rawMessageStrategyConfig" : { "messageField" : "payload", "metadataPrefix" : "" } , "parserConfig": { "columns" : { "f1": 0, , "f2": 1, , "f3": 2 } } }
This would parse the following message:
{ "meta_f1" : "val1", "payload" : "foo,bar,grok", "original_string" : "2019 Jul, 01: val1 foo,bar,grok", "timestamp" : 10000 }
into
{ "meta_f1" : "val1", "f1" : "foo", "f2" : "bar", "f3" : "grok", "original_string" : "2019 Jul, 01: val1 foo,bar,grok", "timestamp" : 10002 }
Note a couple of things here: