Skip to content

Instantly share code, notes, and snippets.

@ericchaves
Created November 18, 2017 19:51
Show Gist options
  • Select an option

  • Save ericchaves/557b4283518c0567907a9351e6f4989d to your computer and use it in GitHub Desktop.

Select an option

Save ericchaves/557b4283518c0567907a9351e6f4989d to your computer and use it in GitHub Desktop.
Simple NiFi InvokeScriptedProcessor to enhance JSON data by doing SQL lookups
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import groovy.sql.Sql
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import org.apache.nifi.annotation.behavior.EventDriven
import org.apache.nifi.annotation.documentation.CapabilityDescription
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.processor.util.StandardValidators
/*
* Implements a Scripted Processor
* - http://funnifi.blogspot.com.br/2016/02/invokescriptedprocessor-hello-world.html
* - https://static.javadoc.io/org.apache.nifi/nifi-api/1.4.0/org/apache/nifi/processor/Processor.html
*/
@EventDriven
@CapabilityDescription("Execute a series of JDBC queries adding the results to each JSON presented in the FlowFile")
class GroovyProcessor implements Processor {
/*
* Define Relationships (https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#documenting-relationships)
*/
final static Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed and had any data enriched are routed here')
.build()
final static Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
Set<Relationship> getRelationships() { [REL_FAILURE, REL_SUCCESS] as Set }
/*
* Define processor properties
* https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#exposing-processor-properties
*/
final static PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("dbcp-connection-pool-services")
.displayName("Database Connection Pool Services")
.description("The Controller Service that is used to obtain a connection to the database.")
.required(true)
.identifiesControllerService(DBCPService)
.build()
final static PropertyDescriptor LOOKUP_ATTR = new PropertyDescriptor.Builder()
.name("lookup-attribute")
.displayName("JSON Lookup attribute")
.description("Value to be used in each lookup queries.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(true)
.build()
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
Collections.unmodifiableList([DBCP_SERVICE, LOOKUP_ATTR]) as List<PropertyDescriptor>
}
/*
* Processor initialization
*/
def log
void initialize(ProcessorInitializationContext context) { log = context.logger}
/*
* Processor execution
* https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#performing-the-work
*/
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session = sessionFactory.createSession()
def flowFile = session.get()
if(!flowFile) return
def lookupField = context.getProperty(LOOKUP_ATTR).evaluateAttributeExpressions(flowFile).value
def queries = context.getProperties().findAll{k,v -> k.isDynamic() }
def dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService)
def conn = dbcpService.getConnection()
try {
def sql = new Sql(conn)
flowFile = session.write(flowFile, {inputStream, outputStream ->
def records = new JsonSlurper().parse(inputStream)
records.each{
def lookupValue = it[lookupField]
queries.each{ k,v ->
def target = k.name
def query = context.getProperty(k).evaluateAttributeExpressions(flowFile).value?.toString()
def rows = sql.rows(query, [lookupValue])
if(rows.size()) {
it.put(target, rows.size()==1 ? rows.get(0) : rows.toArray())
}
}
}
def builder = new JsonBuilder(records)
outputStream.write(builder.toString().getBytes(StandardCharsets.UTF_8))
}as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch (final Throwable t) {
log.error('{} failed to process due to {}', [this, t] as Object[])
session.transfer(flowFile, REL_FAILURE)
}finally{
session.commit()
conn.close()
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'JSON Lookup attribute': return LOOKUP_ATTR
case 'Database Connection Pool Services': return DBCP_SERVICE
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
String getIdentifier() { null }
}
processor = new GroovyProcessor()
@ericchaves
Copy link
Copy Markdown
Author

ericchaves commented Nov 18, 2017

General

This scripted processor expects FlowFiles containing an array of JSON and then iterate over the array performing a set of lookups whose results will be added as attributes to each JSON object.

How to use this scripted processor

  1. Add 3 processors: QueryDatabaseTable, ConvertAvroToJson and InvokeScriptedProcessor
  2. Configure a QueryDatabaseTable processor and route it's success to ConvertAvroToJSON processor
  3. Configure the ConvertAvroToJSON processor and route it's success to InvokeScriptedProcessor
  4. Save the script in a folder that you NiFi can access and the configure the InvokeScriptedProcessor:
    1. Set the script language to groovy
    2. Set the script file to the path where you save this groovy script
  5. Enable the InvokeScriptProcessor and then disable it again. This will update the InvokeScriptedProcessor's properties according to the script.
  6. Configure the InvokeScriptedProcessor's:
    1. Configure the Database Connection Pool Service
    2. Set the JSON Lookup attribute. This is the name of the attribute whose value will be used in each lookup query and MUST be present in each JSON object of the incoming FlowFile.
    3. Add one or more Property (using the + button at the top-right corner of the processor configuration dialog) whose values contains a lookup query. The property's name will be used as the JSON attribute that will be enhanced with the query results. Use an '?' in the query to be replaced by the actual JSON lookup attribute's value.

@behrouz-s
Copy link
Copy Markdown

Thanks, it was very useful :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment