One way to find anomalous behavior in a network is by inspecting user login behavior. In particular, if a user is logging in via vastly differing geographic locations in a short period of time, this may be evidence of malicious behavior.
More formally, we can encode this potentially malicious event in terms of how far from the geographic centroid of the user’s historic logins as compared to all users. For instance, if we track all users and the median distance from the central geographic location of all of their logins for the last 2 hours is 3 km and the standard deviation is 1 km, if we see a user logging in 1700 km from the central geographic location of their logins for the last 2 hours, then they MAY be exhibiting a deviation that we want to monitor since it would be hard to travel that distance in 4 hours. On the other hand, the user may have just used a VPN or proxy. Ultimately, this sort of analytic must be considered only one piece of evidence in addition to many others before we want to indicate an alert.
For the purposes of demonstration, we will construct synthetic data whereby 2 users are logging into a system rather quickly (once per second) from various hosts. Each user’s locations share the same first 2 octets, but will choose the last 2 randomly. We will then inject a data point indicating user1 is logging in via a russian IP address.
We assume that the following environment variables are set:
Also, this does not assume that you are using a kerberized cluster. If you are, then the parser start command will adjust slightly to include the security protocol.
Before editing configurations, be sure to pull the configs from zookeeper locally via
$METRON_HOME/bin/zk_load_configs.sh --mode PULL -z $ZOOKEEPER -o $METRON_HOME/config/zookeeper/ -f
First, we’ll configure the profiler to emit a profiler every 1 minute:
"profiler.client.period.duration" : "1", "profiler.client.period.duration.units" : "MINUTES"
We want to create a new sensor for our synthetic data called auth. To feed it, we need a synthetic data generator. In particular, we want a process which will feed authentication events per second for a set of users where the IPs are randomly chosen, but each user’s login ip addresses share the same first 2 octets.
Edit ~/gen_data.py and paste the following into it:
#!/usr/bin/python import random import sys import time domains = { 'user1' : '173.90', 'user2' : '156.33' } def get_ip(base): return base + '.' + str(random.randint(1,255)) + '.' + str(random.randint(1, 255)) def main(): freq_s = 1 while True: user='user' + str(random.randint(1,len(domains))) epoch_time = int(time.time()) ip=get_ip(domains[user]) print user + ',' + ip + ',' + str(epoch_time) sys.stdout.flush() time.sleep(freq_s) if __name__ == '__main__': main()
The message format for our simple synthetic data is a CSV with:
We will need to parse this via our CSVParser and add the geohash of the login ip address.
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"sensorTopic" : "auth" ,"parserConfig" : { "columns" : { "user" : 0, "ip" : 1, "timestamp" : 2 } } ,"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "hash" ] ,"config" : { "hash" : "GEOHASH_FROM_LOC(GEO_GET(ip))" } } ] }
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic auth --partitions 1 --replication-factor 1
We will need to track 2 profiles to accomplish this task:
We can represent these in the $METRON_HOME/config/zookeeper/profiler.json via the following:
{ "profiles": [ { "profile": "geo_distribution_from_centroid", "foreach": "'global'", "onlyif": "geo_distance != null", "init" : { "s": "STATS_INIT()" }, "update": { "s": "STATS_ADD(s, geo_distance)" }, "result": "s" }, { "profile": "locations_by_user", "foreach": "user", "onlyif": "hash != null && LENGTH(hash) > 0", "init" : { "s": "MULTISET_INIT()" }, "update": { "s": "MULTISET_ADD(s, hash)" }, "result": "s" } ] }
We will need to enrich the authentication records in a couple of ways to use in the threat triage section as well as the profiles:
Beyond that, we will need to determine if the authentication event is a geographic outlier by computing the following fields:
We also want to set up a triage rule associating a score and setting an alert if geo_outlier is true. In reality, this would be more complex as this metric is at best circumstantial and would need supporting evidence, but for simplicity we’ll deal with the false positives.
{ "enrichment": { "fieldMap": { "stellar" : { "config" : [ "geo_locations := MULTISET_MERGE( PROFILE_GET( 'locations_by_user', user, PROFILE_FIXED( 2, 'MINUTES')))", "geo_centroid := GEOHASH_CENTROID(geo_locations)", "geo_distance := TO_INTEGER(GEOHASH_DIST(geo_centroid, hash))", "geo_locations := null" ] } } ,"fieldToTypeMap": { } }, "threatIntel": { "fieldMap": { "stellar" : { "config" : [ "geo_distance_distr:= STATS_MERGE( PROFILE_GET( 'geo_distribution_from_centroid', 'global', PROFILE_FIXED( 2, 'MINUTES')))", "dist_median := STATS_PERCENTILE(geo_distance_distr, 50.0)", "dist_sd := STATS_SD(geo_distance_distr)", "geo_outlier := ABS(dist_median - geo_distance) >= 5*dist_sd", "is_alert := is_alert || (geo_outlier != null && geo_outlier == true)", "geo_distance_distr := null" ] } }, "fieldToTypeMap": { }, "triageConfig" : { "riskLevelRules" : [ { "name" : "Geographic Outlier", "comment" : "Determine if the user's geographic distance from the centroid of the historic logins is an outlier as compared to all users.", "rule" : "geo_outlier != null && geo_outlier", "score" : 10, "reason" : "FORMAT('user %s has a distance (%d) from the centroid of their last login is 5 std deviations (%f) from the median (%f)', user, geo_distance, dist_sd, dist_median)" } ], "aggregator" : "MAX" } } }
From here, we’ve set up our configuration and can push the configs:
$METRON_HOME/bin/zk_load_configs.sh --mode PUSH -z $ZOOKEEPER -i $METRON_HOME/config/zookeeper/
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s auth
python ~/gen_data.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic auth
echo -e "import time\nprint 'user1,109.252.227.173,'+str(int(time.time()))" | python | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic auth
curl -XPOST "http://$ES_HOST/auth*/_search?pretty" -d ' { "_source" : [ "is_alert", "threat:triage:rules:0:reason", "user", "ip", "geo_distance" ], "query": { "exists" : { "field" : "threat:triage:rules:0:reason" } } } '
You should see, among a few other false positive results, something like the following:
{ "_index" : "auth_index_2017.09.07.20", "_type" : "auth_doc", "_id" : "f5bdbf76-9d78-48cc-b21d-bc434c96e62e", "_score" : 1.0, "_source" : { "geo_distance" : 7879, "threat:triage:rules:0:reason" : "user user1 has a distance (7879) from the centroid of their last login is 5 std deviations (334.814719) from the median (128.000000)", "ip" : "109.252.227.173", "is_alert" : "true", "user" : "user1" } }