A common problem in log management is the ability to fetch remote files, get them into some type of manageable structure, and output them into another platform. The remote file could encompass anything: data from HR, a dump from a database, or a CSV of application information. In this case, we’re going to use SFTP to import a remote CSV file, reformat everything to JSON, and then output that to a syslog server. This is simple with NiFi.
Let’s assume we have a CSV being dumped to a remote host every 15 minutes. We’d like the processor to check for a new file every 60 seconds, fetch the CSV, process it, and then delete the file. For this example, we’ll use the following file and directory structure on the remote server:
The file format of the CSV looks like this, including a header:
The first step is to create a GetSFTP
processor. The processor will fetch a remote file on a specified interval, ingest it, and pass it along to the next step. Drag the processor onto the canvas and plug in the following settings:
You can see we’re using regex to grab the file. The assumption is that the file name starts with APPLICATION_OUTPUT
and includes an epoch timestamp. These are the second group of settings from above:
Next we’ll want to create a ConvertRecord
processor. This allows us to convert the data between formats, specifically CSV to JSON. We’ll be creating new services for the different formats that we’ll be using. Double-click on the processor, go to Settings
, and check Failure
under Automatically Terminate Relationships
. This tells NiFi to drop anything that it can’t process.
Next, go to Properties
, and select Create New Service
for the Record Reader. You’ll then choose CSVReader
for the service. Next do the same for the Record Writer, but choose JSONRecordSetWriter
for the service. Click Apply
when finished to save the settings.
We’ll now need to configure the services. Double-click on the same ConvertRecord
processor and go back to Properties
. You’ll notice two “right arrows” for the services you’d created. Click on either one of them to go into the service settings. You’ll see a screen similar to this:
Please note that the new services are disabled by default. We’ll first configure them before enabling them. First click on the gear icon for the CSVReader
service. There are many settings that can be tweaked for this. However, we’ll only need to change one of them from False
to True
:
This tells the service to use the header line at the beginning of the CSV to identify the values. Hit Apply
when done. There are no settings that need to be changed for the JSONRecordSetWriter
service: you can keep everything as the defaults. The last step is to click the “lightning bolt” icons to enable the services. Click Enable
for each and then close the settings page so you’re back on the main canvas.
Next, we’ll create a SplitJSON
processor. This breaks up the JSON object into individual lines instead of one giant blob of data. There is only a single value to change:
You’ll also want to select failure
and original
under Automatically Terminate Relationships
in the processor settings. We only want it to pass the split JSON data onto the next processor.
The last set is creating a PutUDP
processor to output the JSON records to a syslog server. Create the processor, go to Settings
, and check Success
and Failure
under Automatically Terminate Relationships
. Next go to Properties
and plug in the values for your syslog server:
The last step is to connect the processors. First connect the GetSFTP
and ConvertRecord
processor with Success
selected under For Relationships
. Next connect the ConvertRecord
processor to the SplitJSON
processor, also with Success
selected under For Relationships
. Finally, connect the SplitJSON
processor to the PutUDP
processor with Split
for the relationship. You should have the following if everything is correct:
The last step is to right-click on the canvas and select Start
. Assuming everything is configured correctly, the processors should turn green, and we should start seeing data on the syslog server. In my instance I’ve got it sending to Elasticsearch.