Parsers are pluggable components which are used to transform raw data (textual or raw bytes) into JSON messages suitable for downstream enrichment and indexing.
There are two general types types of parsers:
Currently, we have a few mechanisms for either deferring processing of messages or marking messages as invalid.
There are two reasons a message will be marked as invalid:
Those messages which are marked as invalid are sent to the error queue with an indication that they are invalid in the error message.
One can also filter a message by specifying a filterClassName in the parser config. Filtered messages are just dropped rather than passed through.
Data flows through the parser bolt via kafka and into the enrichments topology in kafka. Errors are collected with the context of the error (e.g. stacktrace) and original message causing the error and sent to an error queue. Invalid messages as determined by global validation functions are also treated as errors and sent to an error queue.
Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct parser as needed. There are some constraints around this, in particular regarding some configuration. Additionally, all sensors must flow to the same error topic. The Kafka topic is retrieved from the input Tuple itself.
A worked example of this can be found in the Parser Chaining use case.
All Metron messages follow a specific format in order to ingest a message. If a message does not conform to this format it will be dropped and put onto an error queue for further examination. The message must be of a JSON format and must have a JSON tag message like so:
{"message" : message content}
Where appropriate there is also a standardization around the 5-tuple JSON fields. This is done so the topology correlation engine further down stream can correlate messages from different topologies by these fields. We are currently working on expanding the message standardization beyond these fields, but this feature is not yet availabe. The standard field names are as follows:
The timestamp and original_string fields are mandatory. The remaining standard fields are optional. If any of the optional fields are not applicable then the field should be left out of the JSON.
So putting it all together a typical Metron message with all 5-tuple fields present would look like the following:
{ "message": {"ip_src_addr": xxxx, "ip_dst_addr": xxxx, "ip_src_port": xxxx, "ip_dst_port": xxxx, "protocol": xxxx, "original_string": xxx, "additional-field 1": xxx, } }
There are a few properties which can be managed in the global configuration that have pertinence to parsers and parsing in general.
The topic where messages which were unable to be parsed due to error are sent. Error messages will be indexed under a sensor type of error and the messages will have the following fields:
When aggregating multiple sensors, all sensors must be using the same error topic.
The configuration for the various parser topologies is defined by JSON documents stored in zookeeper.
The document is structured in the following way
{ "filterClassName" : "STELLAR" ,"parserConfig" : { "filter.query" : "exists(field1)" } }
Example of a cache config to contain at max 20000 stellar expressions for at most 20 minutes.:
{ "cacheConfig" : { "stellar.cache.maxSize" : 20000, "stellar.cache.maxTimeRetain" : 20 } }
The fieldTransformations is a complex object which defines a transformation which can be done to a message. This transformation can
Metadata is a useful thing to send to Metron and use during enrichment or threat intelligence.
Consider the following scenarios:
As such, there are two types of metadata that we seek to support in Metron:
Metadata is controlled by the following parser configs:
In order to avoid collisions from metadata fields, metadata fields will be prefixed (the default is metron.metadata., but this is configurable in the rawMessageStrategyConfig). So, for instance the kafka topic would be in the field metron.metadata.topic.
Custom metadata is specified by sending a JSON Map in the key. If no key is sent, then, obviously, no metadata will be parsed. For instance, sending a metadata field called customer_id could be done by sending
{ "customer_id" : "my_customer_id" }
in the kafka key. This would be exposed as the field metron.metadata.customer_id to stellar field transformations as well, if mergeMetadata is true, available as a field in its own right.
The format of a fieldTransformation is as follows:
The currently implemented fieldTransformations are:
Consider the following simple configuration which will remove field1 unconditionally:
{ ... "fieldTransformations" : [ { "input" : "field1" , "transformation" : "REMOVE" } ] }
Consider the following simple sensor parser configuration which will remove field1 whenever field2 exists and whose corresponding equal to ‘foo’:
{ ... "fieldTransformations" : [ { "input" : "field1" , "transformation" : "REMOVE" , "config" : { "condition" : "exists(field2) and field2 == 'foo'" } } ] }
For example:
{ ... "fieldTransformations" : [ { "output" : ["field1", "field2" ] , "transformation" : "SELECT" } ] }
when applied to a message containing keys field1, field2 and field3, will only output the first two. It is also worth noting that two standard fields - timestamp and original_source - will always be passed along whether they are listed in output or not, since they are considered core required fields.
Consider the following sensor parser config to map the protocol field to a textual representation of the protocol:
{ ... "fieldTransformations" : [ { "input" : "protocol" , "transformation" : "IP_PROTOCOL" } ] }
This transformation would transform { "protocol" : 6, "source.type" : "bro", ... } into { "protocol" : "TCP", "source.type" : "bro", ...}
STELLAR : This transformation executes a set of transformations expressed as Stellar Language statements.
RENAME : This transformation allows users to rename a set of fields. Specifically, the config is presumed to be the mapping. The keys to the config are the existing field names and the values for the config map are the associated new field name.
The following config will rename the fields old_field and different_old_field to new_field and different_new_field respectively:
{ ... "fieldTransformations" : [ { "transformation" : "RENAME", , "config" : { "old_field" : "new_field", "different_old_field" : "different_new_field" } } ] }
The following config will set the field logical_source_type to one of the following, dependent upon the value of the pix_type field:
{ ... "fieldTransformations" : [ { "transformation" : "REGEX_ROUTING" ,"input" : "pix_type" ,"output" : "logical_source_type" ,"config" : { "cisco-6-302" : [ "^6-302.*", "^06-302.*"] "cisco-5-304" : "^5-304.*" } } ] ... }
If, in your field transformation, you assign a field to null, the field will be removed. You can use this capability to rename variables. It is preferred, however, that the RENAME field transformation is used in this situation as it is less awkward.
Consider this example:
"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "new_field", "old_field"] ,"config" : { "new_field" : "old_field" ,"old_field" : "null" } } ]
This would set new_field to the value of old_field and remove old_field.
Currently, the stellar expressions are expressed in the form of a map where the keys define the fields and the values define the Stellar expressions. You order the expression evaluation in the output field. A consequence of this choice to store the assignments as a map is that the same field cannot appear in the map as a key twice.
For instance, the following will not function as expected:
"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "new_field"] ,"config" : { "new_field" : "TO_UPPER(field1)" ,"new_field" : "TO_LOWER(new_field)" } } ]
In the above example, the last instance of new_field will win and TO_LOWER(new_field) will be evaluated while TO_UPPER(field1) will be skipped.
Consider the following sensor parser config to add three new fields to a message:
{ ... "fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "utc_timestamp", "url_host", "url_protocol" ] ,"config" : { "utc_timestamp" : "TO_EPOCH_TIMESTAMP(timestamp, 'yyyy-MM-dd HH:mm:ss', MAP_GET(dc, dc2tz, 'UTC') )" ,"url_host" : "URL_TO_HOST(url)" ,"url_protocol" : "URL_TO_PROTOCOL(url)" } } ] ,"parserConfig" : { "dc2tz" : { "nyc" : "EST" ,"la" : "PST" ,"london" : "UTC" } } }
Note that the dc2tz map is in the parser config, so it is accessible in the functions.
Consider the following example configuration for the yaf sensor:
{ "parserClassName":"org.apache.metron.parsers.GrokParser", "sensorTopic":"yaf", "fieldTransformations" : [ { "input" : "protocol" ,"transformation": "IP_PROTOCOL" } ], "parserConfig": { "grokPath":"/patterns/yaf", "patternLabel":"YAF_DELIMITED", "timestampField":"start_time", "timeFields": ["start_time", "end_time"], "dateFormat":"yyyy-MM-dd HH:mm:ss.S" } }
Parser adapters are loaded dynamically in each Metron topology. They are defined in the Parser Config (defined above) JSON file in Zookeeper.
Java parser adapters are intended for higher-velocity topologies and are not easily changed or extended. As the adoption of Metron continues we plan on extending our library of Java adapters to process more log formats. As of this moment the Java adapters included with Metron are:
Grok parser adapters are designed primarily for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies. Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible. Grok parsers are defined via a config file and the topplogy does not need to be recompiled in order to make changes to them. Example of a Grok parsers are:
Parsers that derive from GrokParser typically allow the GrokParser to parse the messages, and then override the methods for postParse to do further parsing. When this is the case, and the Parser has not overridden parse(byte[]) or parseResultOptional(byte[]) these parsers will gain support for treating byte[] input as multiple lines, with each line parsed as a separate message ( and returned as such). This is enabled by using the "multiline":"true" Parser configuration option.
For more information on the Grok project please refer to the following link:
https://github.com/thekrakken/java-grok
Starting a particular parser topology on a running Metron deployment is as easy as running the start_parser_topology.sh script located in $METRON_HOME/bin. This utility will allow you to configure and start the running topology assuming that the sensor specific parser configuration exists within zookeeper.
The usage for start_parser_topology.sh is as follows:
usage: start_parser_topology.sh -e,--extra_topology_options <JSON_FILE> Extra options in the form of a JSON file with a map for content. -esc,--extra_kafka_spout_config <JSON_FILE> Extra spout config options in the form of a JSON file with a map for content. Possible keys are: retryDelayMaxMs,retryDelay Multiplier,retryInitialDel ayMs,stateUpdateIntervalMs ,bufferSizeBytes,fetchMaxW ait,fetchSizeBytes,maxOffs etBehind,metricsTimeBucket SizeInSecs,socketTimeoutMs -ewnt,--error_writer_num_tasks <NUM_TASKS> Error Writer Num Tasks -ewp,--error_writer_p <PARALLELISM_HINT> Error Writer Parallelism Hint -h,--help This screen -iwnt,--invalid_writer_num_tasks <NUM_TASKS> Invalid Writer Num Tasks -iwp,--invalid_writer_p <PARALLELISM_HINT> Invalid Message Writer Parallelism Hint -k,--kafka <BROKER_URL> Kafka Broker URL -ksp,--kafka_security_protocol <SECURITY_PROTOCOL> Kafka Security Protocol -mt,--message_timeout <TIMEOUT_IN_SECS> Message Timeout in Seconds -mtp,--max_task_parallelism <MAX_TASK> Max task parallelism -na,--num_ackers <NUM_ACKERS> Number of Ackers -nw,--num_workers <NUM_WORKERS> Number of Workers -ot,--output_topic <KAFKA_TOPIC> Output Kafka Topic -pnt,--parser_num_tasks <NUM_TASKS> Parser Num Tasks -pp,--parser_p <PARALLELISM_HINT> Parser Parallelism Hint -s,--sensor <SENSOR_TYPE> Sensor Type -snt,--spout_num_tasks <NUM_TASKS> Spout Num Tasks -sp,--spout_p <SPOUT_PARALLELISM_HINT> Spout Parallelism Hint -t,--test <TEST> Run in Test Mode -z,--zk <ZK_QUORUM> Zookeeper Quroum URL (zk1:2181,zk2:2181,...
These options are intended to configure the Storm Kafka Spout more completely. These options can be specified in a JSON file containing a map associating the kafka spout configuration parameter to a value. The range of values possible to configure are:
For instance, creating a JSON file which will set the offsets to UNCOMMITTED_EARLIEST
{ "spout.firstPollOffsetStrategy" : "UNCOMMITTED_EARLIEST" }
This would be loaded by passing the file as argument to --extra_kafka_spout_config
These options are intended to be Storm configuration options and will live in a JSON file which will be loaded into the Storm config. For instance, if you wanted to set a storm property on the config called topology.ticks.tuple.freq.secs to 1000 and storm.local.dir to /opt/my/path you could create a file called custom_config.json containing
{ "topology.ticks.tuple.freq.secs" : 1000, "storm.local.dir" : "/opt/my/path" }
and pass --extra_topology_options custom_config.json to start_parser_topology.sh.
Default installed Metron is untuned for production deployment. There are a few knobs to tune to get the most out of your system.
When using aggregated parsers, it’s highly recommended to aggregate parsers with similar velocity and parser complexity together.
In order to allow for meta alerts to be queries alongside regular alerts in Elasticsearch 2.x, it is necessary to add an additional field to the templates and mapping for existing sensors.
Please see a description of the steps necessary to make this change in the metron-elasticsearch Using Metron with Elasticsearch 2.x
If Solr is selected as the real-time store, it is also necessary to add additional fields. See the Solr section in metron-indexing for more details.
The kafka queue associated with your parser is a collection point for all of the data sent to your parser. As such, make sure that the number of partitions in the kafka topic is sufficient to handle the throughput that you expect from your parser topology.
The enrichment topology as started by the $METRON_HOME/bin/start_parser_topology.sh script uses a default of one executor per bolt. In a real production system, this should be customized by modifying the arguments sent to this utility.
Finally, if workers and executors are new to you, the following might be of use to you: