Apache NiFi: SFTP/CSV to Syslog/JSON

Apache NiFi: SFTP/CSV to Syslog/JSON

A common problem in log management is the ability to fetch remote files, get them into some type of manageable structure, and output them into another platform. The remote file could encompass anything: data from HR, a dump from a database, or a CSV of application information. In this case, we’re going to use SFTP to import a remote CSV file, reformat everything to JSON, and then output that to a syslog server. This is simple with NiFi.

Let’s assume we have a CSV being dumped to a remote host every 15 minutes. We’d like the processor to check for a new file every 60 seconds, fetch the CSV, process it, and then delete the file. For this example, we’ll use the following file and directory structure on the remote server:

File name: APPLICATION_OUTPUT-1586265224.CSV
Directory: /tmp/APP_LOGS
Log Sample.

The file format of the CSV looks like this, including a header:

id,first_name,last_name,email,ip_address
1,Vincent,Pimblott,vpimblott0@plala.or.jp,30.239.57.185
2,Kimmy,Bragge,kbragge1@fastcompany.com,218.159.125.3
3,Ursula,Mosconi,umosconi2@360.cn,106.131.117.125
4,Joline,Cogman,jcogman3@alexa.com,44.57.107.49
CSV.

The first step is to create a GetSFTP processor. The processor will fetch a remote file on a specified interval, ingest it, and pass it along to the next step. Drag the processor onto the canvas and plug in the following settings:

GetSFTP Processor.
GetSFTP Properties.

You can see we’re using regex to grab the file. The assumption is that the file name starts with APPLICATION_OUTPUT and includes an epoch timestamp. These are the second group of settings from above:

Remote Path: /tmp/APP_LOGS
File Filter Regex: ^APPLICATION_OUTPUT-\d*?\.CSV
RegEx for Files.

Next we’ll want to create a ConvertRecord processor. This allows us to convert the data between formats, specifically CSV to JSON. We’ll be creating new services for the different formats that we’ll be using. Double-click on the processor, go to Settings, and check Failure under Automatically Terminate Relationships. This tells NiFi to drop anything that it can’t process.

Next, go to Properties, and select Create New Service for the Record Reader. You’ll then choose CSVReader for the service. Next do the same for the Record Writer, but choose JSONRecordSetWriter for the service. Click Apply when finished to save the settings.

We’ll now need to configure the services. Double-click on the same ConvertRecord processor and go back to Properties. You’ll notice two “right arrows” for the services you’d created. Click on either one of them to go into the service settings. You’ll see a screen similar to this:

Screenshot.

Please note that the new services are disabled by default. We’ll first configure them before enabling them. First click on the gear icon for the CSVReader service. There are many settings that can be tweaked for this. However, we’ll only need to change one of them from False to True:

Treat First Line as Header: True
CSVReader

This tells the service to use the header line at the beginning of the CSV to identify the values. Hit Apply when done. There are no settings that need to be changed for the JSONRecordSetWriter service: you can keep everything as the defaults. The last step is to click the “lightning bolt” icons to enable the services. Click Enable for each and then close the settings page so you’re back on the main canvas.

Next, we’ll create a SplitJSON processor. This breaks up the JSON object into individual lines instead of one giant blob of data. There is only a single value to change:

JsonPath Expression: $.*
JsonPath Expression.

You’ll also want to select failure and original under Automatically Terminate Relationships in the processor settings. We only want it to pass the split JSON data onto the next processor.

The last set is creating a PutUDP processor to output the JSON records to a syslog server. Create the processor, go to Settings, and check Success and Failure under Automatically Terminate Relationships. Next go to Properties and plug in the values for your syslog server:

PutUDP Example.

The last step is to connect the processors. First connect the GetSFTP and ConvertRecord processor with Success selected under For Relationships. Next connect the ConvertRecord processor to the SplitJSON processor, also with Success selected under For Relationships. Finally, connect the SplitJSON processor to the PutUDP processor with Split for the relationship. You should have the following if everything is correct:

Completed Flow.

The last step is to right-click on the canvas and select Start. Assuming everything is configured correctly, the processors should turn green, and we should start seeing data on the syslog server. In my instance I’ve got it sending to Elasticsearch.

Elastic Output.
Show Comments