Parsers are pluggable components which are used to transform raw data (textual or raw bytes) into JSON messages suitable for downstream enrichment and indexing.
There are two general types types of parsers:
ipSrcAddr -> ip_src_addr ipDstAddr -> ip_dst_addr ipSrcPort -> ip_src_port
Note this property may be necessary, because java does not support underscores in the named group names. So in case your property naming conventions requires underscores in property names, use this property.
fields : A json list of maps contaning a record type to regular expression mapping.
A complete configuration example would look like:
"convertCamelCaseToUnderScore": true, "recordTypeRegex": "kernel|syslog", "messageHeaderRegex": "(<syslogPriority>(<=^<)\\d{1,4}(?=>)).*?(<timestamp>(<=>)[A-Za-z] {3}\\s{1,2}\\d{1,2}\\s\\d{1,2}:\\d{1,2}:\\d{1,2}(?=\\s)).*?(<syslogHost>(<=\\s).*?(?=\\s))", "fields": [ { "recordType": "kernel", "regex": ".*(<eventInfo>(<=\\]|\\w\\:).*?(?=$))" }, { "recordType": "syslog", "regex": ".*(<processid>(<=PID\\s=\\s).*?(?=\\sLine)).*(<filePath>(<=64\\s)\/([A-Za-z0-9_-]+\/)+(?=\\w)) (<fileName>.*?(?=\")).*(<eventInfo>(<=\").*?(?=$))" } ]
Note: messageHeaderRegex and regex (withing fields) could be specified as lists also e.g.
"messageHeaderRegex": [ "regular expression 1", "regular expression 2" ]
Where regular expression 1 are valid regular expressions and may have named groups, which would be extracted into fields. This list will be evaluated in order until a matching regular expression is found.
messageHeaderRegex is run on all the messages. Yes, all the messages are expected to contain the fields which are being extracted using the messageHeaderRegex. messageHeaderRegex is a sort of HCF (highest common factor) in all messages.
recordTypeRegex can be a more advanced regular expression containing named goups. For example
“recordTypeRegex”: “(<process>(<=\s)\b(kernel|syslog)\b(?=\[|:))”
Here all the named goups (process in above example) will be extracted as fields.
Though having named group in recordType is completely optional, still one could want extract named groups in recordType for following reasons:
regex within a field could be a list of regular expressions also. In this case all regular expressions in the list will be attempted to match until a match is found. Once a full match is found remaining regular expressions are ignored.
"regex": [ "record type specific regular expression 1", "record type specific regular expression 2"]
timesamp
Since this parser is a general purpose parser, it will populate the timestamp field with current UTC timestamp. Actual timestamp value can be overridden later using stellar. For example in case of syslog timestamps, one could use following stellar construct to override the timestamp value. Let us say you parsed actual timestamp from the raw log:
<38>Jun 20 15:01:17 hostName sshd[11672]: Accepted publickey for prod from 55.55.55.55 port 66666 ssh2
syslogTimestamp=“Jun 20 15:01:17”
Then something like below could be used to override the timestamp.
"timestamp_str": "FORMAT('%s%s%s', YEAR(),' ',syslogTimestamp)", "timestamp":"TO_EPOCH_TIMESTAMP(timestamp_str, 'yyyy MMM dd HH:mm:ss' )"
OR, if you want to factor in the timezone
"timestamp":"TO_EPOCH_TIMESTAMP(timestamp_str, timestamp_format, timezone_name )"
Messages are routed to the Kafka enrichments topic by default. The output topic can be changed with the output_topic option when Starting the Parser Topology or with the outputTopic Parser Configuration setting. The order of precedence from highest to lowest is as follows:
A message can also be routed to other locations besides Kafka with the writerClassName Parser Configuration setting. Messages can be routed independently for each sensor type when configured with Parser Configuration settings.
Currently, we have a few mechanisms for either deferring processing of messages or marking messages as invalid.
There are two reasons a message will be marked as invalid:
Those messages which are marked as invalid are sent to the error queue with an indication that they are invalid in the error message. The messages will contain “error_type”:“parser_invalid”. Note, you will not see additional exceptions in the logs for this type of failure, rather the error messages are written directly to the configured error topic. See Topology Errors for more.
One can also filter a message by specifying a filterClassName in the parser config. Filtered messages are just dropped rather than passed through.
Data flows through the parser via kafka and into the enrichments topology in kafka. Errors are collected with the context of the error (e.g. stacktrace) and original message causing the error and sent to an error queue. Invalid messages as determined by global validation functions are also treated as errors and sent to an error queue.
All Metron messages follow a specific format in order to ingest a message. If a message does not conform to this format it will be dropped and put onto an error queue for further examination. The message must be of a JSON format and must have a JSON tag message like so:
{"message" : message content}
Where appropriate there is also a standardization around the 5-tuple JSON fields. This is done so the topology correlation engine further down stream can correlate messages from different topologies by these fields. We are currently working on expanding the message standardization beyond these fields, but this feature is not yet availabe. The standard field names are as follows:
The timestamp and original_string fields are mandatory. The remaining standard fields are optional. If any of the optional fields are not applicable then the field should be left out of the JSON.
So putting it all together a typical Metron message with all 5-tuple fields present would look like the following:
{ "message": { "ip_src_addr": xxxx, "ip_dst_addr": xxxx, "ip_src_port": xxxx, "ip_dst_port": xxxx, "protocol": xxxx, "original_string": xxx, "additional-field 1": xxx } }
There are a few properties which can be managed in the global configuration that have pertinence to parsers and parsing in general.
The topic where messages which were unable to be parsed due to error are sent. Error messages will be indexed under a sensor type of error and the messages will have the following fields:
When aggregating multiple sensors, all sensors must be using the same error topic.
The configuration for the various parser topologies is defined by JSON documents stored in zookeeper.
The document is structured in the following way
Example Stellar Filter which includes messages which contain a the field1 field:
{ "filterClassName" : "STELLAR", "parserConfig" : { "filter.query" : "exists(field1)" } }
writerClassName : The class used to write messages after they have been parsed. Defaults to org.apache.metron.writer.kafka.KafkaWriter.
Example of a cache config to contain at max 20000 stellar expressions for at most 20 minutes.:
{ "cacheConfig" : { "stellar.cache.maxSize" : 20000, "stellar.cache.maxTimeRetain" : 20 } }
The fieldTransformations is a complex object which defines a transformation which can be done to a message. This transformation can
For platform specific configs, see the README of the appropriate project. This would include settings such as parallelism of individual components and general configuration.
Metadata is a useful thing to send to Metron and use during enrichment or threat intelligence.
Consider the following scenarios:
As such, there are two types of metadata that we seek to support in Metron:
Metadata is controlled by the following parser configs:
In order to avoid collisions from metadata fields, metadata fields will be prefixed (the default is metron.metadata., but this is configurable in the rawMessageStrategyConfig). So, for instance the kafka topic would be in the field metron.metadata.topic.
Custom metadata is specified by sending a JSON Map in the key. If no key is sent, then, obviously, no metadata will be parsed. For instance, sending a metadata field called customer_id could be done by sending
{ "customer_id" : "my_customer_id" }
in the kafka key. This would be exposed as the field metron.metadata.customer_id to stellar field transformations as well, if mergeMetadata is true, available as a field in its own right.
When a telemetry message fails to parse correctly, a separate error message is produced and sent to the error topic. This error message will contain detailed information to reflect the error that occurred.
If the telemetry message that failed contains metadata, this metadata is included in the error message. For example, here is an error message that contains two metadata fields; metron.metadata.topic and metron.metadata.customer.
{ "exception": "java.lang.IllegalStateException: Unable to parse Message: \"this is an invalid synthetic message\" }", "stack": "java.lang.IllegalStateException: Unable to parse Message: \"this is an invalid synthetic message\" ...\n", "raw_message": "\"this is an invalid synthetic message\" }", "error_hash": "3d498968e8df7f28d05db3037d4ad2a3a0095c22c14d881be45fac3f184dbcc3", "message": "Unable to parse Message: \"this is an invalid synthetic message\" }", "source.type": "error", "failed_sensor_type": "bro", "hostname": "node1", "error_type": "parser_error", "guid": "563d8d2a-1493-4758-be2f-5613bfd2d615", "timestamp": 1548366516634, "metron.metadata.topic": "bro", "metron.metadata.customer": "acme-inc" }
By default, error messages are sent to the indexing topic. This will cause the errors to be indexed in whichever endpoints you have configured, namely Solr, Elasticsearch, and HDFS. You may need to update your configuration of these endpoints to accurately reflect the metadata fields contained in the error message. For example, you may need to update the schema definition of your Solr Collection for the metadata fields to be accurately indexed in the Error collection.
The format of a fieldTransformation is as follows:
The currently implemented fieldTransformations are:
REMOVE : This transformation removes the specified input fields. If you want a conditional removal, you can pass a Metron Query Language statement to define the conditions under which you want to remove the fields.
Consider the following simple configuration which will remove field1 unconditionally:
{ ... "fieldTransformations" : [ { "input" : "field1" , "transformation" : "REMOVE" } ] }
Consider the following simple sensor parser configuration which will remove field1 whenever field2 exists and whose corresponding equal to ‘foo’:
{ ... "fieldTransformations" : [ { "input" : "field1" , "transformation" : "REMOVE" , "config" : { "condition" : "exists(field2) and field2 == 'foo'" } } ] }
SELECT: This transformation filters the fields in the message to include only the configured output fields, and drops any not explicitly included.
For example:
{ ... "fieldTransformations" : [ { "output" : ["field1", "field2" ] , "transformation" : "SELECT" } ] }
when applied to a message containing keys field1, field2 and field3, will only output the first two. It is also worth noting that two standard fields - timestamp and original_source - will always be passed along whether they are listed in output or not, since they are considered core required fields.
IP_PROTOCOL : This transformation maps IANA protocol numbers to consistent string representations.
Consider the following sensor parser config to map the protocol field to a textual representation of the protocol:
{ ... "fieldTransformations" : [ { "input" : "protocol" , "transformation" : "IP_PROTOCOL" } ] }
This transformation would transform { "protocol" : 6, "source.type" : "bro", ... } into { "protocol" : "TCP", "source.type" : "bro", ...}
STELLAR : This transformation executes a set of transformations expressed as Stellar Language statements.
RENAME : This transformation allows users to rename a set of fields. Specifically, the config is presumed to be the mapping. The keys to the config are the existing field names and the values for the config map are the associated new field name.
The following config will rename the fields old_field and different_old_field to new_field and different_new_field respectively:
{ ... "fieldTransformations" : [ { "transformation" : "RENAME", , "config" : { "old_field" : "new_field", "different_old_field" : "different_new_field" } } ] }
REGEX_SELECT : This transformation lets users set an output field to one of a set of possibilities based on matching regexes. This transformation is useful when the number or conditions are large enough to make a stellar language match statement unwieldy.
The following config will set the field logical_source_type to one of the following, dependent upon the value of the pix_type field:
{ ... "fieldTransformations" : [ { "transformation" : "REGEX_ROUTING" ,"input" : "pix_type" ,"output" : "logical_source_type" ,"config" : { "cisco-6-302" : [ "^6-302.*", "^06-302.*"] "cisco-5-304" : "^5-304.*" } } ] ... }
If, in your field transformation, you assign a field to null, the field will be removed. You can use this capability to rename variables. It is preferred, however, that the RENAME field transformation is used in this situation as it is less awkward.
Consider this example:
"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "new_field", "old_field"] ,"config" : { "new_field" : "old_field" ,"old_field" : "null" } } ]
This would set new_field to the value of old_field and remove old_field.
Currently, the stellar expressions are expressed in the form of a map where the keys define the fields and the values define the Stellar expressions. You order the expression evaluation in the output field. A consequence of this choice to store the assignments as a map is that the same field cannot appear in the map as a key twice.
For instance, the following will not function as expected:
"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "new_field"] ,"config" : { "new_field" : "TO_UPPER(field1)" ,"new_field" : "TO_LOWER(new_field)" } } ]
In the above example, the last instance of new_field will win and TO_LOWER(new_field) will be evaluated while TO_UPPER(field1) will be skipped.
Consider the following sensor parser config to add three new fields to a message:
{ ... "fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "utc_timestamp", "url_host", "url_protocol" ] ,"config" : { "utc_timestamp" : "TO_EPOCH_TIMESTAMP(timestamp, 'yyyy-MM-dd HH:mm:ss', MAP_GET(dc, dc2tz, 'UTC') )" ,"url_host" : "URL_TO_HOST(url)" ,"url_protocol" : "URL_TO_PROTOCOL(url)" } } ] ,"parserConfig" : { "dc2tz" : { "nyc" : "EST" ,"la" : "PST" ,"london" : "UTC" } } }
Note that the dc2tz map is in the parser config, so it is accessible in the functions.
Consider the following example configuration for the yaf sensor:
{ "parserClassName":"org.apache.metron.parsers.GrokParser", "sensorTopic":"yaf", "fieldTransformations" : [ { "input" : "protocol" ,"transformation": "IP_PROTOCOL" } ], "parserConfig": { "grokPath":"/patterns/yaf", "patternLabel":"YAF_DELIMITED", "timestampField":"start_time", "timeFields": ["start_time", "end_time"], "dateFormat":"yyyy-MM-dd HH:mm:ss.S" } }
Parser adapters are loaded dynamically in each Metron topology. They are defined in the Parser Config (defined above) JSON file in Zookeeper.
Java parser adapters are intended for higher-velocity topologies and are not easily changed or extended. As the adoption of Metron continues we plan on extending our library of Java adapters to process more log formats. As of this moment the Java adapters included with Metron are:
Grok parser adapters are designed primarily for someone who is not a Java coder for quickly standing up a parser adapter for lower velocity topologies. Grok relies on Regex for message parsing, which is much slower than purpose-built Java parsers, but is more extensible. Grok parsers are defined via a config file and the topplogy does not need to be recompiled in order to make changes to them. Example of a Grok parsers are:
Parsers that derive from GrokParser typically allow the GrokParser to parse the messages, and then override the methods for postParse to do further parsing. When this is the case, and the Parser has not overridden parse(byte[]) or parseResultOptional(byte[]) these parsers will gain support for treating byte[] input as multiple lines, with each line parsed as a separate message ( and returned as such). This is enabled by using the "multiline":"true" Parser configuration option.
For more information on the Grok project please refer to the following link:
https://github.com/thekrakken/java-grok
Starting a particular parser on a running Metron deployment is dependent on the platform being run on. Please see the appropriate platform-specific README.
For all platforms, you will need to provide
Default installed Metron is untuned for production deployment. There are a few knobs to tune to get the most out of your system.
When using aggregated parsers, it’s highly recommended to aggregate parsers with similar velocity and parser complexity together.
Platform specific notes can be found in the appropriate README
In order to allow for meta alerts to be queries alongside regular alerts in Elasticsearch 2.x, it is necessary to add an additional field to the templates and mapping for existing sensors.
Please see a description of the steps necessary to make this change in the metron-elasticsearch Using Metron with Elasticsearch 2.x
If Solr is selected as the real-time store, it is also necessary to add additional fields. See the Solr section in metron-indexing for more details.
The kafka queue associated with your parser is a collection point for all of the data sent to your parser. As such, make sure that the number of partitions in the kafka topic is sufficient to handle the throughput that you expect from your parser topology.