Recombining Multiline Logs into JSON With Apache NiFi

Recombining Multiline Logs into JSON With Apache NiFi

Background

This guide will walk you through addressing an issue that every security professional has encountered at some point in their career: trying to make sense of multiline logs. By multiline I'm referring to a single "event" comprised of multiple discrete events. There might be a single overarching event, e.g. an email being processed by the email gateway, but there could be upwards of 30-40 syslog events that are generated while the email is processed. This can include events such as the email getting checked against known blacklists, the attachment getting checked for malware, the HTML content being scanned, etc.

Here's a mocked-up set of logs from an email gateway:

<141>Mar  16 03:22:35 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=6 x=26bgea8e42-6 mod=mail cmd=env_from value=77334-spam-4533211@host.adserver.com ofrom=77334-spam-4533211@231xadserver.com qid=26bgea8e42-6 tls=DCAEC-RSA-AES256-GCM-SHA256 routes= notroutes=av_protected_bypass,tls_fallback host=ubuntu.cheaphosting.com ip=10.92.45.50
<141>Mar  16 03:22:35 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=5 x=26bgea8e42-5 mod=session cmd=data rcpt=user1@company.corp suborg=
<141>Mar  16 03:22:33 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=4 x=26bgea8e42-4 mod=session cmd=data rcpt=user2@company.corp suborg=
<141>Mar  16 03:22:31 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=3 x=26bgea8e42-3 mod=session cmd=data rcpt=user3@company.corp suborg=
<141>Mar  16 03:22:35 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=5 x=26bgea8e42-5 mod=mail cmd=msg module=pdr rule=pass action=continue attachments=0 rcpts=1 routes=default_inbound size=32965 guid=JWEFc4dKXdshgdh8AGo6w3X hdr_mid=<MailClient.bulksender@231xadserver.com> qid=034bCJoc049463 hops-ip=10.92.45.50 subject="Could You Review?" spamscore=0 virusname= 
<141>Mar  16 03:22:34 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=4 x=26bgea8e42-4 mod=mail cmd=msg module=pdr rule=pass action=continue attachments=0 rcpts=1 routes=default_inbound size=32675 guid=IyAeyOPusoY4aKIycA9-qcthUeEL-9c0 hdr_mid=<581065005.596768250.3256412391815.MailClient.bulksender@231xadserver.com> qid=034bCJob049463 hops-ip=10.92.45.50 subject="Check This Out!" spamscore=0 virusname=
<141>Mar  16 03:22:35 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=5 x=26bgea8e42-5 mod=mail cmd=env_from value=MailClient.bulksender@relay.238xadserver.com ofrom=MailClient.bulksender@relay.238xadserver.com qid=26bgae8e42-5 tls=DCAEC-RSA-AES256-GCM-SHA256 routes= notroutes=av_protected_bypass,tls_fallback host=ubuntu.cheaphosting.com ip=10.92.45.50
<141>Mar  16 03:22:35 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=5 x=26bgea8e42-5 mod=mail cmd=attachment id=0 file=text.txt mime=text/plain type=txt omime=text/plain oext=txt corrupted=0 protected=0 size=3980 virtual=0 a=0
<141>Mar  16 03:22:35 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=6 x=26bgea8e42-6 mod=session cmd=data rcpt=user7@company.corp suborg=
<141>Mar  16 03:22:36 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=6 x=26bgea8e42-6 mod=mail cmd=msg module=pdr rule=pass action=continue attachments=0 rcpts=1 routes=default_inbound size=32974 guid=44U0xVLbLztJoxZVrirp3_K3mQfggbN6 hdr_mid=<MailClient.bulksender@231xadserver.com> qid=034bCJod019463 hops-ip=10.92.45.50 subject="Could You Review?" spamscore=0 virusname= duration=0.257 elapsed=0.412
<141>Mar  16 03:22:36 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=6 x=26bgea8e42-6 mod=spam cmd=run rule=primaryspamfilter_bulk policy=primaryspamfilter score=0 bulkscore=100 mlxscore=0 lowpriorityscore=100 suspectscore=30 priorityscore=515 malwarescore=0 mlxlogscore=972 phishscore=0 spamscore=0 clxscore=21 adultscore=0 impostorscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.12.0-2003020000 definitions=main-2005080114 raw=0 tests=HTML_FONT_BIG,HTML_FONT_INVISIBLE,HTML_FONT_TINY,HTML_MESSAGE,HTML_TAG_EXIST_TBODY,NO_TO_REAL_NAME,174:XYZ_BODY_DIGIT_COUNT,XYZ_HAS_10_19_HTTP_URIS,XYZ_H
<141>Mar  16 03:22:35 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=5 x=26bgea8e42-5 mod=spam cmd=run rule=primaryspamfilter_bulk policy=primaryspamfilter score=0 bulkscore=100 mlxscore=0 lowpriorityscore=100 suspectscore=30 priorityscore=515 malwarescore=0 mlxlogscore=980 phishscore=0 spamscore=0 clxscore=21 adultscore=0 impostorscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.12.0-2003020000 definitions=main-2005080114 raw=0 tests=HTML_FONT_BIG,HTML_FONT_INVISIBLE,HTML_FONT_TINY,HTML_MESSAGE,HTML_TAG_EXIST_TBODY,NO_TO_REAL_NAME,175:XYZ_BODY_DIGIT_COUNT,XYZ_CT_BOUNDARY_PART_00,XYZ_H
<141>Mar  16 03:22:34 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=4 x=26bgea8e42-4 mod=spam cmd=run rule=primaryspamfilter_bulk policy=primaryspamfilter score=0 bulkscore=100 mlxscore=0 lowpriorityscore=100 suspectscore=30 priorityscore=497 malwarescore=0 mlxlogscore=999 phishscore=0 spamscore=0 clxscore=57 adultscore=0 impostorscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.12.0-2003020000 definitions=main-2005080114 raw=0 tests=HTML_FONT_BIG,HTML_FONT_INVISIBLE,HTML_FONT_TINY,HTML_MESSAGE,HTML_TAG_EXIST_TBODY,NO_TO_REAL_NAME,165:XYZ_BODY_DIGIT_COUNT,XYZ_HAS_10_19_HTTP_URIS,XYZ_H
<141>Mar  16 03:22:32 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=3 x=26bgea8e42-3 mod=mail cmd=msg module=pdr rule=pass action=continue attachments=0 rcpts=1 routes=default_inbound size=32970 guid=5VAul9750LkBjhoKXJ8qw8GJXC9e5mIv hdr_mid=<MailClient.bulksender@231xadserver.com> qid=034bCJoa019463 hops-ip=10.92.45.50 subject="Could You Review?" spamscore=0 virusname= duration=0.692 elapsed=0.860
<141>Mar  16 03:22:32 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=3 x=26bgea8e42-3 mod=spam cmd=run rule=primaryspamfilter_bulk policy=primaryspamfilter score=0 bulkscore=100 mlxscore=0 lowpriorityscore=100 suspectscore=30 priorityscore=419 malwarescore=0 mlxlogscore=966 phishscore=0 spamscore=0 clxscore=57 adultscore=0 impostorscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.12.0-2003020000 definitions=main-2005080114 raw=0 tests=HTML_FONT_BIG,HTML_FONT_INVISIBLE,HTML_FONT_TINY,HTML_MESSAGE,HTML_TAG_EXIST_TBODY,NO_TO_REAL_NAME,179:XYZ_BODY_DIGIT_COUNT,XYZ_CT_BOUNDARY_PART_00,XYZ_CT_BOUNDARY_PART_03,XYZ_CT_MULTIPART_ALT_TOPLEVEL,P
<141>Mar  16 03:22:29 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=2 x=26bgea8e42-2 mod=mail cmd=msg module=pdr rule=pass action=continue attachments=0 rcpts=1 routes=default_inbound size=32969 guid=-ldb8m5V-KpSGIVYVzDkJINUac1rXZtq hdr_mid=<MailClient.bulksender@231xadserver.com> qid=034bCJoY019463 hops-ip=10.92.45.50 subject="Could You Review?" spamscore=0 virusname= duration=0.602 elapsed=0.792
<141>Mar  16 03:22:28 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=2 x=26bgea8e42-2 mod=session cmd=data rcpt=user18@company.corp suborg=
<141>Mar  16 03:22:24 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=1 x=26bgea8e42-1 mod=mail cmd=msg module=pdr rule=pass action=continue attachments=0 rcpts=1 routes=default_inbound size=32993 guid=FpoE82Kf7M2bmJUsn5vGdsPHqMEIwgLB hdr_mid=<MailClient.bulksender@231xadserver.com> qid=034bCJoV019463 hops-ip=10.92.45.50 subject="Could You Review?" spamscore=0 virusname= duration=0.257 elapsed=0.400
<141>Mar  16 03:22:24 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=1 x=26bgea8e42-1 mod=session cmd=data rcpt=user16@company.corp suborg=
<141>Mar  16 03:22:24 mysysloghost1 filter_instance1[27659]: rprt s=26bgea8e42 m=1 x=26bgea8e42-1 mod=spam cmd=run rule=primaryspamfilter_bulk policy=primaryspamfilter score=0 bulkscore=100 mlxscore=0 lowpriorityscore=100 suspectscore=30 priorityscore=510 malwarescore=0 mlxlogscore=999 phishscore=0 spamscore=0 clxscore=21 adultscore=0 impostorscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.12.0-2003020000 definitions=main-2005080114 raw=0 tests=HTML_FONT_BIG,HTML_FONT_INVISIBLE,HTML_FONT_TINY,HTML_MESSAGE,HTML_TAG_EXIST_TBODY,NO_TO_REAL_NAME,165:XYZ_BODY_DIGIT_COUNT,XYZ_CT_BOUNDARY_PART_00,XYZ_H
Mail log sample.

Wouldn't it be a lot easier to create a "meta event" that contains just the necessary details, like the example below? That's what we'll walk through.

<141> Jun 14 17:05:21 mysysloghost1 {"file":"text.txt","all_recipients":"user18@company.corp,user2@company.corp,user3@company.corp,user1@company.corp,user16@company.corp,user7@company.corp","sender":"77334-spam-4533211@host.adserver.com","subject":"Could You Review?","module":"mail","classifier":"spam","rule":"pass","sessionid":"26bgea8e42","sender_host":"ubuntu.cheaphosting.com","policy":"primaryspamfilter"}
Recombined log sample.

Understanding the Data

Let's take a look at the data before building the flow. As mentioned above, there are lots of unique events associated with that single email activity. Unfortunately there's a chance of it taking 30-45 seconds between the first and last event being received from the email gateway. Factor in the thousands of logs that can be received during that timeframe, and it can quickly become a nightmare to stitch everything back together.

Fortunately, each event in the sample above contains a session identifier that flags each event as being part of a single activity pipeline, i.e. s=26bgea8e42. That's what we're going to focus on to stitch these back together.

Creating the NiFi Flow

ListenSyslog

I've traditionally used the ListenUDP NiFi processor for simplicity. In this example we'll take it a step further and use the ListenSyslog processor instead, mainly because we're going to be using attributes to increase flexibility. Drag the ListenSyslog processor onto the NiFi canvas. You only need to make two changes here: check invalid under Automatically Terminate Relationshipsin Settings, and add 514 for the Port in Properties.

ExtractText

Next we'll add an ExtractText processor. This allows us to extract a value from the flow and assign it to an attribute that we'll use later. Drag an ExtractText processor onto the canvas. We'll only need to change a few values. First check unmatched under Automatically Terminate Relationships in Settings. Next to go Properties, click the + sign, add sessionid for the Property Name, and \ss=(.*?)\s as the property value. Also change Include Capture Group 0 to False since we won't need it.

That regex is actually pretty simple: \s looks for a space, followed by the characters s=, and then assigns the data after that to a capture group until it encounters another space.

What happens with that sessionid value? NiFi extracts it from the events and assigns it to an "attribute" in the flow. We'll use that as the key value to recombine the multitude of events that are generated.

Extracted Attributes.

MergeContent

Next we'll add a MergeContent processor to stitch those events into a single event. This is actually a really neat processor when you're dealing with security logs. It essentially creates "bins" that it'll drop the data based on a specified attribute, e.g. the sessionid value that we created above. The data in the bin is then released when it either (1) hits the maximum number of records for the bin or (2) a specified amount of time has passed since the bin was created.

Why is this useful? Imagine a stream of data from different log sources passing through NiFi. NiFi is analyzing them, flagging logs based on key fields, stitching them back together based on a specified attribute, and then releasing the combined log as a single event. Setup is incredibly simple too.

To start, drag a MergeContent processor onto the canvas. Under Settings, check Failure and Original under Automatically Terminate Relationships since we only care about the merged event. Next go to Properties and change a few values:

Attribute Strategy: Keep All Unique Attributes
Correlation Attribute Name: sessionid
Minimum Number of Entries: 1 [Configurable]
Maximum Number of Entries: 64 [Configurable]
Maximum Number of Bins: 128 [Configurable]
Max Bin Age: 5 sec
Delimiter Strategy: Text
MergeContent Settings.

This says "dump everything with matching the same sessionid into a bin, and release the bin when it hits 64 events or 5 seconds". To summarize, recombine those email events into a single large event.

ExtractText

Now that we've created our mega-events, we're going to extract the key fields that we'd like and assign them as "attributes" to the flow. Drag another ExtractText processor to the canvas. Check unmatched under Automatically Terminate Relationships in Settings. Next go to Properties. There is only two values you'll need to change which are listed below. The rest are created by clicking the + sign and adding them, just like the previous ExtractText processor.

Include Capture Group 0: False
Enable repeating capture group: True

classifier: \sclassifier=(.*?)\s
file: \sfile=(.*?)\s
module: \smod=(.*?)\s
policy: \spolicy=(.*?)\s
recipient: \srcpt=(.*?)\s
rule: \srule=(.*?)\s
sender: \smod=mail\scmd=env_from\svalue=(.*?)\s
sender_host: \shost=(.*?)\s
sessionid: \ss=(.*?)\s
subject: \ssubject="(.*?)"\s
ExtractText Properties.

UpdateAttribute

Rather than having a mess of recipient fields, e.g. an email sent to 10 different people, we're going to recombine them into a single field called all_recipients that includes all the destination email address. For example:

Add an UpdateAttribute processor to the canvas. There is only one value that you'll need to create under Properties by clicking the + button:

all_recipients: ${allMatchingAttributes("recipient\.\d*"):join(",")}
UpdateAttribute Properties.

This says "grab the value from any attribute with recipient.X and add it to the attribute all_recipients.

UpdateAttribute Processor.

AttributesToJSON

Next up is the AttributesToJSON processor. You can probably guess what it does: it turns the attributes from above into a JSON event. You can even pick which attributes you'd like to include, which is what we'll do below. First drag an AttributesToJSON processor to the canvas. Check Failure under Automatically Terminate Relationships in Settings. Next go to the Properties tab and add the following:

Attributes List: all_recipients,classifier,file,module,policy,rule,sender,sender_host,sessionid,subject
Destination: flowfile-attribute
Include Core Attributes: false
AttributesToJSON Properties. 

PutSyslog

Finally, we'll be using the PutSyslog processor to send the event back out as a properly formatted syslog event. I'd normally use PutUDP but figured we'd keep this as close to a "real" syslog event as possible. Drag the PutSyslog processor onto the canvas. Check Failure, Invalid, and Success under Automatically Terminate Relationships in Settings. This processor is the last stop so we'll be dropped every event at the end.

Next we'll go the the Properties tab and put in the following values:

Hostname: elastic [Your log destination]
Protocol: UDP
Port: 1514 [Destination port, typically 514]
Message Priority: ${syslog.priority}
Message Version: ${syslog.version}
Message Timestamp: ${now():format('MMM d HH:mm:ss')}
Message Hostname: ${syslog.hostname}
Message Body: ${JSONAttributes}
PutSyslog Properties.

The ${JSONAttributes} plugs the values from the previous processor into the syslog message body.

The Completed Flow

And done! You'll now want to connect the processors in the order below. It should automatically select the relationship since we've checked Automatically Terminate Relationships in each processor. The final flow should look like this:

Full Flow Diagram.

You'll just need to right-click on the canvas and select Start to fire everything up. Also, if you don't feel like creating the entire flow from start to finish, you can download the flow template from here.

Example Output

Going back to the original log sample, we now have this being sent to our syslog destination which is much easier to process:

Show Comments