Skip to content

Instantly share code, notes, and snippets.

@ericchaves
Created November 18, 2017 19:51
Show Gist options
  • Select an option

  • Save ericchaves/557b4283518c0567907a9351e6f4989d to your computer and use it in GitHub Desktop.

Select an option

Save ericchaves/557b4283518c0567907a9351e6f4989d to your computer and use it in GitHub Desktop.
Simple NiFi InvokeScriptedProcessor to enhance JSON data by doing SQL lookups
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import groovy.sql.Sql
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import org.apache.nifi.annotation.behavior.EventDriven
import org.apache.nifi.annotation.documentation.CapabilityDescription
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.processor.util.StandardValidators
/*
* Implements a Scripted Processor
* - http://funnifi.blogspot.com.br/2016/02/invokescriptedprocessor-hello-world.html
* - https://static.javadoc.io/org.apache.nifi/nifi-api/1.4.0/org/apache/nifi/processor/Processor.html
*/
@EventDriven
@CapabilityDescription("Execute a series of JDBC queries adding the results to each JSON presented in the FlowFile")
class GroovyProcessor implements Processor {
/*
* Define Relationships (https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#documenting-relationships)
*/
final static Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed and had any data enriched are routed here')
.build()
final static Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
Set<Relationship> getRelationships() { [REL_FAILURE, REL_SUCCESS] as Set }
/*
* Define processor properties
* https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#exposing-processor-properties
*/
final static PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
.name("dbcp-connection-pool-services")
.displayName("Database Connection Pool Services")
.description("The Controller Service that is used to obtain a connection to the database.")
.required(true)
.identifiesControllerService(DBCPService)
.build()
final static PropertyDescriptor LOOKUP_ATTR = new PropertyDescriptor.Builder()
.name("lookup-attribute")
.displayName("JSON Lookup attribute")
.description("Value to be used in each lookup queries.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(true)
.build()
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
Collections.unmodifiableList([DBCP_SERVICE, LOOKUP_ATTR]) as List<PropertyDescriptor>
}
/*
* Processor initialization
*/
def log
void initialize(ProcessorInitializationContext context) { log = context.logger}
/*
* Processor execution
* https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#performing-the-work
*/
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session = sessionFactory.createSession()
def flowFile = session.get()
if(!flowFile) return
def lookupField = context.getProperty(LOOKUP_ATTR).evaluateAttributeExpressions(flowFile).value
def queries = context.getProperties().findAll{k,v -> k.isDynamic() }
def dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService)
def conn = dbcpService.getConnection()
try {
def sql = new Sql(conn)
flowFile = session.write(flowFile, {inputStream, outputStream ->
def records = new JsonSlurper().parse(inputStream)
records.each{
def lookupValue = it[lookupField]
queries.each{ k,v ->
def target = k.name
def query = context.getProperty(k).evaluateAttributeExpressions(flowFile).value?.toString()
def rows = sql.rows(query, [lookupValue])
if(rows.size()) {
it.put(target, rows.size()==1 ? rows.get(0) : rows.toArray())
}
}
}
def builder = new JsonBuilder(records)
outputStream.write(builder.toString().getBytes(StandardCharsets.UTF_8))
}as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch (final Throwable t) {
log.error('{} failed to process due to {}', [this, t] as Object[])
session.transfer(flowFile, REL_FAILURE)
}finally{
session.commit()
conn.close()
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'JSON Lookup attribute': return LOOKUP_ATTR
case 'Database Connection Pool Services': return DBCP_SERVICE
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
String getIdentifier() { null }
}
processor = new GroovyProcessor()
@behrouz-s
Copy link
Copy Markdown

Thanks, it was very useful :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment