Created
November 18, 2017 19:51
-
-
Save ericchaves/557b4283518c0567907a9351e6f4989d to your computer and use it in GitHub Desktop.
Simple NiFi InvokeScriptedProcessor to enhance JSON data by doing SQL lookups
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import groovy.json.JsonBuilder | |
| import groovy.json.JsonSlurper | |
| import groovy.sql.Sql | |
| import java.nio.charset.StandardCharsets | |
| import org.apache.commons.io.IOUtils | |
| import org.apache.nifi.annotation.behavior.EventDriven | |
| import org.apache.nifi.annotation.documentation.CapabilityDescription | |
| import org.apache.nifi.components.PropertyDescriptor | |
| import org.apache.nifi.dbcp.DBCPService | |
| import org.apache.nifi.processor.Relationship | |
| import org.apache.nifi.processor.exception.ProcessException | |
| import org.apache.nifi.processor.io.StreamCallback | |
| import org.apache.nifi.processor.util.StandardValidators | |
| /* | |
| * Implements a Scripted Processor | |
| * - http://funnifi.blogspot.com.br/2016/02/invokescriptedprocessor-hello-world.html | |
| * - https://static.javadoc.io/org.apache.nifi/nifi-api/1.4.0/org/apache/nifi/processor/Processor.html | |
| */ | |
| @EventDriven | |
| @CapabilityDescription("Execute a series of JDBC queries adding the results to each JSON presented in the FlowFile") | |
| class GroovyProcessor implements Processor { | |
| /* | |
| * Define Relationships (https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#documenting-relationships) | |
| */ | |
| final static Relationship REL_SUCCESS = new Relationship.Builder() | |
| .name("success") | |
| .description('FlowFiles that were successfully processed and had any data enriched are routed here') | |
| .build() | |
| final static Relationship REL_FAILURE = new Relationship.Builder() | |
| .name("failure") | |
| .description('FlowFiles that were not successfully processed are routed here') | |
| .build() | |
| Set<Relationship> getRelationships() { [REL_FAILURE, REL_SUCCESS] as Set } | |
| /* | |
| * Define processor properties | |
| * https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#exposing-processor-properties | |
| */ | |
| final static PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() | |
| .name("dbcp-connection-pool-services") | |
| .displayName("Database Connection Pool Services") | |
| .description("The Controller Service that is used to obtain a connection to the database.") | |
| .required(true) | |
| .identifiesControllerService(DBCPService) | |
| .build() | |
| final static PropertyDescriptor LOOKUP_ATTR = new PropertyDescriptor.Builder() | |
| .name("lookup-attribute") | |
| .displayName("JSON Lookup attribute") | |
| .description("Value to be used in each lookup queries.") | |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | |
| .required(true) | |
| .expressionLanguageSupported(true) | |
| .build() | |
| @Override | |
| List<PropertyDescriptor> getPropertyDescriptors() { | |
| Collections.unmodifiableList([DBCP_SERVICE, LOOKUP_ATTR]) as List<PropertyDescriptor> | |
| } | |
| /* | |
| * Processor initialization | |
| */ | |
| def log | |
| void initialize(ProcessorInitializationContext context) { log = context.logger} | |
| /* | |
| * Processor execution | |
| * https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#performing-the-work | |
| */ | |
| void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { | |
| def session = sessionFactory.createSession() | |
| def flowFile = session.get() | |
| if(!flowFile) return | |
| def lookupField = context.getProperty(LOOKUP_ATTR).evaluateAttributeExpressions(flowFile).value | |
| def queries = context.getProperties().findAll{k,v -> k.isDynamic() } | |
| def dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService) | |
| def conn = dbcpService.getConnection() | |
| try { | |
| def sql = new Sql(conn) | |
| flowFile = session.write(flowFile, {inputStream, outputStream -> | |
| def records = new JsonSlurper().parse(inputStream) | |
| records.each{ | |
| def lookupValue = it[lookupField] | |
| queries.each{ k,v -> | |
| def target = k.name | |
| def query = context.getProperty(k).evaluateAttributeExpressions(flowFile).value?.toString() | |
| def rows = sql.rows(query, [lookupValue]) | |
| if(rows.size()) { | |
| it.put(target, rows.size()==1 ? rows.get(0) : rows.toArray()) | |
| } | |
| } | |
| } | |
| def builder = new JsonBuilder(records) | |
| outputStream.write(builder.toString().getBytes(StandardCharsets.UTF_8)) | |
| }as StreamCallback) | |
| session.transfer(flowFile, REL_SUCCESS) | |
| } catch (final Throwable t) { | |
| log.error('{} failed to process due to {}', [this, t] as Object[]) | |
| session.transfer(flowFile, REL_FAILURE) | |
| }finally{ | |
| session.commit() | |
| conn.close() | |
| } | |
| } | |
| @Override | |
| Collection<ValidationResult> validate(ValidationContext context) { null } | |
| @Override | |
| PropertyDescriptor getPropertyDescriptor(String name) { | |
| switch(name) { | |
| case 'JSON Lookup attribute': return LOOKUP_ATTR | |
| case 'Database Connection Pool Services': return DBCP_SERVICE | |
| default: return null | |
| } | |
| } | |
| @Override | |
| void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } | |
| @Override | |
| String getIdentifier() { null } | |
| } | |
| processor = new GroovyProcessor() |
Author
Thanks, it was very useful :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
General
This scripted processor expects FlowFiles containing an array of JSON and then iterate over the array performing a set of lookups whose results will be added as attributes to each JSON object.
How to use this scripted processor