Aggregating many different types sensors into a single data source (e.g. syslog) and ingesting that aggregate sensor into Metron is a common pattern. It is not obvious precisely how to manage these types of aggregate sensors as they require two-pass parsing. This document will walk through an example of supporting this kind of multi-pass ingest.
Multi-pass parser involves the following requirements:
At a high level, we continue to maintain the architectural invariant of a 1-1 relationship between logical sensors and storm topologies. Eventually this relationship may become more complex, but at the moment the approach is to construct a routing parser which will have two responsibilities:
Because the data emitted from the routing parser is just like any data emitted from any other parser, in that it is a JSON blob like any data emitted from any parser, we will need to adjust the downstream parsers to extract the enveloped data from the JSON blob and treat it as the data to parse.
We assume that the following environment variables are set:
Before editing configurations, be sure to pull the configs from zookeeper locally via
$METRON_HOME/bin/zk_load_configs.sh --mode PULL -z $ZOOKEEPER -o $METRON_HOME/config/zookeeper/ -f
Consider the following situation, we have some logs from a Cisco PIX device that we would like to ingest. The format is syslog, but multiple scenarios exist in the same log file. Specificaly, let’s consider the sample logs here.
The log lines in general have the following components:
Let’s consider two types of messages that we’d like to parse:
A couple things are apparent from this:
We will proceed to create 3 separate parsers:
In order to assist in these parsers, we’re going to accumulate some grok expressions which will help us deal with these various parsers.
CISCO_ACTION Built|Teardown|Deny|Denied|denied|requested|permitted|denied by ACL|discarded|est-allowed|Dropping|created|deleted CISCO_REASON Duplicate TCP SYN|Failed to locate egress interface|Invalid transport field|No matching connection|DNS Response|DNS Query|(?:%{WORD}\s*)* CISCO_DIRECTION Inbound|inbound|Outbound|outbound CISCOFW302020_302021 %{CISCO_ACTION:action}(?:%{CISCO_DIRECTION:direction})? %{WORD:protocol} connection %{GREEDYDATA:ignore} faddr %{IP:ip_dst_addr}/%{INT:icmp_seq_num}(?:\(%{DATA:fwuser}\))? gaddr %{IP:ip_src_xlated}/%{INT:icmp_code_xlated} laddr %{IP:ip_src_addr}/%{INT:icmp_code}( \(%{DATA:user}\))? ACCESSED %{URIHOST:ip_src_addr} Accessed URL %{IP:ip_dst_addr}:%{URIPATHPARAM:uri_path} CISCO_PIX %{GREEDYDATA:timestamp}: %PIX-%{NOTSPACE:pix_type}: %{GREEDYDATA:data}
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic pix_syslog_router --partitions 1 --replication-factor 1
{ "parserClassName" : "org.apache.metron.parsers.GrokParser" ,"sensorTopic" : "pix_syslog_router" , "parserConfig": { "grokPath": "/tmp/cisco_patterns", "batchSize" : 1, "patternLabel": "CISCO_PIX", "timestampField": "timestamp", "timeFields" : [ "timestamp" ], "dateFormat" : "MMM dd yyyy HH:mm:ss", "kafka.topicField" : "logical_source_type" } ,"fieldTransformations" : [ { "transformation" : "REGEX_SELECT" ,"input" : "pix_type" ,"output" : "logical_source_type" ,"config" : { "cisco-6-302" : "^6-302.*", "cisco-5-304" : "^5-304.*" } } ] }
A couple of things to note about this config:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic cisco-6-302 --partitions 1 --replication-factor 1
{ "parserClassName" : "org.apache.metron.parsers.GrokParser" ,"sensorTopic" : "cisco-6-302" ,"rawMessageStrategy" : "ENVELOPE" ,"rawMessageStrategyConfig" : { "messageField" : "data", "metadataPrefix" : "" } , "parserConfig": { "grokPath": "/tmp/cisco_patterns", "batchSize" : 1, "patternLabel": "CISCOFW302020_302021" } }
Note a couple of things:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic cisco-5-304 --partitions 1 --replication-factor 1
{ "parserClassName" : "org.apache.metron.parsers.GrokParser" ,"sensorTopic" : "cisco-5-304" ,"rawMessageStrategy" : "ENVELOPE" ,"rawMessageStrategyConfig" : { "messageField" : "data", "metadataPrefix" : "" } , "parserConfig": { "grokPath": "/tmp/cisco_patterns", "batchSize" : 1, "patternLabel": "ACCESSED" } }
Mostly the same comments from the previous parser apply here; we are just using a different pattern label.
Now we should start the parsers
$METRON_HOME/bin/zk_load_configs.sh --mode PUSH -z $ZOOKEEPER -i $METRON_HOME/config/zookeeper/
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-6-302
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-5-304
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s pix_syslog_router
cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic pix_syslog_router
You should see indices created for the cisco-5-304 and cisco-6-302 data with appropriate fields created for each type.
Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
Instead of creating a topology per sensor, all 3 (pix-syslog-parser, cisco-5-304, and cisco-6-302) can be run in a single aggregated parser. It’s also possible to aggregate a subset of these parsers (e.g. run cisco-6-302 as it’s own topology, and aggregate the other 2).
The step to start parsers then becomes
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-6-302,cisco-5-304,pix_syslog_router
The flow through the Storm topology and Kafka topics: