A Bro log writer that sends logging output to Kafka. This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to process the data generated by Bro.
Install librdkafka, a native client library for Kafka. This plugin has been tested against the latest release of librdkafka, which at the time of this writing is v0.9.4.
In order to use this plugin within a kerberized Kafka environment, you will also need libsasl2 installed and will need to pass --enable-sasl to the configure script.
curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz cd librdkafka-0.9.4/ ./configure --enable-sasl make sudo make install
Build the plugin using the following commands.
./configure --bro-dist=$BRO_SRC make sudo make install
Run the following command to ensure that the plugin was installed successfully.
$ bro -N Bro::Kafka Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
The following examples highlight different ways that the plugin can be used. Simply add the Bro script language to your local.bro file (for example, /usr/share/bro/site/local.bro) as shown to demonstrate the example.
The goal in this example is to send all HTTP and DNS records to a Kafka topic named bro.
@load Bro/Kafka/logs-to-kafka.bro redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); redef Kafka::topic_name = "bro"; redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "localhost:9092" );
It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named http and all DNS records to a separate Kafka topic named dns.
@load Bro/Kafka/logs-to-kafka.bro redef Kafka::topic_name = ""; redef Kafka::tag_json = T; event bro_init() { # handles HTTP local http_filter: Log::Filter = [ $name = "kafka-http", $writer = Log::WRITER_KAFKAWRITER, $config = table( ["metadata.broker.list"] = "localhost:9092" ), $path = "http" ]; Log::add_filter(HTTP::LOG, http_filter); # handles DNS local dns_filter: Log::Filter = [ $name = "kafka-dns", $writer = Log::WRITER_KAFKAWRITER, $config = table( ["metadata.broker.list"] = "localhost:9092" ), $path = "dns" ]; Log::add_filter(DNS::LOG, dns_filter); }
You may want to configure bro to filter log messages with certain characteristics from being sent to your kafka topics. For instance, Metron currently doesn’t support IPv6 source or destination IPs in the default enrichments, so it may be helpful to filter those log messages from being sent to kafka (although there are multiple ways to approach this). In this example we will do that that, and are assuming a somewhat standard bro kafka plugin configuration, such that:
@load Bro/Kafka/logs-to-kafka.bro redef Kafka::topic_name = "bro"; redef Kafka::tag_json = T; event bro_init() &priority=-5 { # handles HTTP Log::add_filter(HTTP::LOG, [ $name = "kafka-http", $writer = Log::WRITER_KAFKAWRITER, $pred(rec: HTTP::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, $config = table( ["metadata.broker.list"] = "localhost:9092" ) ]); # handles DNS Log::add_filter(DNS::LOG, [ $name = "kafka-dns", $writer = Log::WRITER_KAFKAWRITER, $pred(rec: DNS::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, $config = table( ["metadata.broker.list"] = "localhost:9092" ) ]); # handles Conn Log::add_filter(Conn::LOG, [ $name = "kafka-conn", $writer = Log::WRITER_KAFKAWRITER, $pred(rec: Conn::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, $config = table( ["metadata.broker.list"] = "localhost:9092" ) ]); }
The global configuration settings for Kafka. These values are passed through directly to librdkafka. Any valid librdkafka settings can be defined in this table. The full set of valid librdkafka settings are available here.
redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "localhost:9092", ["client.id"] = "bro" );
The name of the topic in Kafka where all Bro logs will be sent to.
redef Kafka::topic_name = "bro";
The maximum number of milliseconds that the plugin will wait for any backlog of queued messages to be sent to Kafka before forced shutdown.
redef Kafka::max_wait_on_shutdown = 3000;
This plugin supports producing messages from a kerberized kafka. There are a couple of prerequisites and a couple of settings to set.
If you are using SASL as a security protocol for kafka, then you must have libsasl or libsasl2 installed. You can tell if sasl is enabled by running the following from the directory in which you have build librdkafka:
examples/rdkafka_example -X builtin.features builtin.features = gzip,snappy,ssl,sasl,regex
As stated above, you can configure the producer kafka configs in ${BRO_HOME}/share/bro/site/local.bro. There are a few configs necessary to set, which are described here. For an environment where the following is true:
The kafka topic bro has been given permission for the metron user to write:
# login using the metron user kinit -kt /etc/security/keytabs/metron.headless.keytab metron@EXAMPLE.COM ${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro
The following is how the ${BRO_HOME}/share/bro/site/local.bro looks:
@load Bro/Kafka/logs-to-kafka.bro redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); redef Kafka::topic_name = "bro"; redef Kafka::tag_json = T; redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "node1:6667" , ["security.protocol"] = "SASL_PLAINTEXT" , ["sasl.kerberos.keytab"] = "/etc/security/keytabs/metron.headless.keytab" , ["sasl.kerberos.principal"] = "metron@EXAMPLE.COM" );