Recombining Multiline Logs into JSON With Apache NiFi
Nathan Labadie -
Background
This guide will walk you through addressing an issue that every security professional has encountered at some point in their career: trying to make sense of multiline logs. By multiline I'm referring to a single "event" comprised of multiple discrete events. There might be a single overarching event, e.g. an email being processed by the email gateway, but there could be upwards of 30-40 syslog events that are generated while the email is processed. This can include events such as the email getting checked against known blacklists, the attachment getting checked for malware, the HTML content being scanned, etc.
Here's a mocked-up set of logs from an email gateway:
Wouldn't it be a lot easier to create a "meta event" that contains just the necessary details, like the example below? That's what we'll walk through.
Understanding the Data
Let's take a look at the data before building the flow. As mentioned above, there are lots of unique events associated with that single email activity. Unfortunately there's a chance of it taking 30-45 seconds between the first and last event being received from the email gateway. Factor in the thousands of logs that can be received during that timeframe, and it can quickly become a nightmare to stitch everything back together.
Fortunately, each event in the sample above contains a session identifier that flags each event as being part of a single activity pipeline, i.e. s=26bgea8e42. That's what we're going to focus on to stitch these back together.
Creating the NiFi Flow
ListenSyslog
I've traditionally used the ListenUDP NiFi processor for simplicity. In this example we'll take it a step further and use the ListenSyslog processor instead, mainly because we're going to be using attributes to increase flexibility. Drag the ListenSyslog processor onto the NiFi canvas. You only need to make two changes here: check invalid under Automatically Terminate Relationshipsin Settings, and add 514 for the Port in Properties.
ExtractText
Next we'll add an ExtractText processor. This allows us to extract a value from the flow and assign it to an attribute that we'll use later. Drag an ExtractText processor onto the canvas. We'll only need to change a few values. First check unmatched under Automatically Terminate Relationships in Settings. Next to go Properties, click the + sign, add sessionid for the Property Name, and \ss=(.*?)\s as the property value. Also change Include Capture Group 0 to False since we won't need it.
That regex is actually pretty simple: \s looks for a space, followed by the characters s=, and then assigns the data after that to a capture group until it encounters another space.
What happens with that sessionid value? NiFi extracts it from the events and assigns it to an "attribute" in the flow. We'll use that as the key value to recombine the multitude of events that are generated.
MergeContent
Next we'll add a MergeContent processor to stitch those events into a single event. This is actually a really neat processor when you're dealing with security logs. It essentially creates "bins" that it'll drop the data based on a specified attribute, e.g. the sessionid value that we created above. The data in the bin is then released when it either (1) hits the maximum number of records for the bin or (2) a specified amount of time has passed since the bin was created.
Why is this useful? Imagine a stream of data from different log sources passing through NiFi. NiFi is analyzing them, flagging logs based on key fields, stitching them back together based on a specified attribute, and then releasing the combined log as a single event. Setup is incredibly simple too.
To start, drag a MergeContent processor onto the canvas. Under Settings, check Failure and Original under Automatically Terminate Relationships since we only care about the merged event. Next go to Properties and change a few values:
This says "dump everything with matching the same sessionid into a bin, and release the bin when it hits 64 events or 5 seconds". To summarize, recombine those email events into a single large event.
ExtractText
Now that we've created our mega-events, we're going to extract the key fields that we'd like and assign them as "attributes" to the flow. Drag another ExtractText processor to the canvas. Check unmatched under Automatically Terminate Relationships in Settings. Next go to Properties. There is only two values you'll need to change which are listed below. The rest are created by clicking the + sign and adding them, just like the previous ExtractText processor.
UpdateAttribute
Rather than having a mess of recipient fields, e.g. an email sent to 10 different people, we're going to recombine them into a single field called all_recipients that includes all the destination email address. For example:
Add an UpdateAttribute processor to the canvas. There is only one value that you'll need to create under Properties by clicking the + button:
This says "grab the value from any attribute with recipient.X and add it to the attribute all_recipients.
AttributesToJSON
Next up is the AttributesToJSON processor. You can probably guess what it does: it turns the attributes from above into a JSON event. You can even pick which attributes you'd like to include, which is what we'll do below. First drag an AttributesToJSON processor to the canvas. Check Failure under Automatically Terminate Relationships in Settings. Next go to the Properties tab and add the following:
PutSyslog
Finally, we'll be using the PutSyslog processor to send the event back out as a properly formatted syslog event. I'd normally use PutUDP but figured we'd keep this as close to a "real" syslog event as possible. Drag the PutSyslog processor onto the canvas. Check Failure, Invalid, and Success under Automatically Terminate Relationships in Settings. This processor is the last stop so we'll be dropped every event at the end.
Next we'll go the the Properties tab and put in the following values:
The ${JSONAttributes} plugs the values from the previous processor into the syslog message body.
The Completed Flow
And done! You'll now want to connect the processors in the order below. It should automatically select the relationship since we've checked Automatically Terminate Relationships in each processor. The final flow should look like this:
You'll just need to right-click on the canvas and select Start to fire everything up. Also, if you don't feel like creating the entire flow from start to finish, you can download the flow template from here.
Example Output
Going back to the original log sample, we now have this being sent to our syslog destination which is much easier to process: