Apache Nifi: Pulling From MySQL and Sending to Syslog

Apache Nifi: Pulling From MySQL and Sending to Syslog

My last post covered the bare-bones basics of using Apache Nifi to load-balance incoming syslog events. In this example we’re going to take it a step further: query a MySQL database containing entity events from Home Assistant, reformat the output to JSON, and then output that to a syslog server.

Let’s get started by downloading the necessary Java connector. This is also assuming that you’ve installed NiFi in /opt/nifi per the previous instructions.

mkdir -p /opt/nifi/drivers
cd /opt/nifi/drivers
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

At this point go to the Properties tab. Here we’re going to configure the MySQL service. Next to Database Connection Pooling Service, click the blank space and select Create New Service. The only thing you’ll need to do is rename the Controller Service Name to something like MySQL Connection. Click Create when finished.

On the settings page, you’ll now see a “right arrow” to the right of the Database Connection Pooling Service in the processor settings. Click on the arrow and then click on the gear icon for the previous MySQL Connection service that you’d created. There are a few settings here that will need to be changed to reflect your environment. As far as the database URL structure, you’ll need to change the hostname at the start of the URL and the database name at the end of the URL.

Database Connection URL: jdbc:mysql://hostname:3306/databasename?serverTimezone=UTC
Database Driver Class Name: com.mysql.jdbc.Driver
Database Driver Location(s): /opt/nifi/drivers/mysql-connector-java-8.0.19.jar
Database User: A user with permissions to access the DB.
Database Password: Password for above. 

My particular example:

You’ll also want to ensure the service is enabled. In my case, it was disabled after I’d created it. Easy enough to correct:

Next get back into the settings for the QueryDatabaseTable processor. You’ll now plug in the settings for the database on the Properties tab.

Database Connection Pooling Service: The one you'd created above.
Database Type: MySQL
Table Name: The table of the database you're querying. 
Maximum-value Columns: The incrementing value column.

It’s important to specify the Maximum-value Columns, otherwise the connection will pull the same data over and over. It acts as a placeholder so the processor knows what data had previously been collected. The column should be something that auto-increments as rows are added to the table. For Home Assistant, there’s a state_id column that increments by 1 every time a row is added.

My particular example:

…and that was the complicated part. The rest is fairly straight forward. Now that we’ve got the database connection configured, we’ll need to modify the output. The data comes out of the processor in Avro format, but we want to convert this to JSON. Create a ConvertAvroToJSON processor and connect the previous QueryDatabaseTable to it. Select success for the value although that’s probably the only option. You don’t need to change anything with the ConvertAvroToJSON processor but these are the settings:

The output will be in JSON. Unmodified, the flow will include a huge number events from the database in one giant blob. We’ll need to split these apart into individual events using the SplitJSON processor. There is only a single value to change:

JsonPath Expression: $.*

You’ll also want to select failure and original under Automatically Terminate Relationships in the processor settings. We only want it to pass the split JSON data onto the next processor. My example:

Connect the ConvertAvroToJSON processor to the SplitJSON processor for both success and failure. Almost done! The last step is to output the data in pseudo-syslog format, i.e. we’re just going to send it somewhere on 514/UDP and leave it up to the syslog receiver to timestamp it.

At this point we’ll create a PutUDP processor. You’ll only need to plug in the hostname and the port. You’ll also want to go to the processor settings and check success and failure under Automatically Terminate Relationships. This is my particular example:

Last step: connect the SplitJSON processor to the PutUDP processor. Select split as the relationship since we only want to send over the split JSON data. You’ll have something like this when everything is done:

At the point you’d need to right-click and start everything up. Hope this helps! This can likely be adapted to just about any database that is supported by JDBC.

Show Comments