import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Resources; import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.name.Named; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificRecordBase; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.internals.RecordHeaders; import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider; import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider; import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState; import org.jsmart.zerocode.core.kafka.client.BasicKafkaClient; import org.jsmart.zerocode.core.kafka.delivery.DeliveryDetails; import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs; import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecord; import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static org.jsmart.zerocode.core.constants.ZerocodeConstants.FAILED; import static org.jsmart.zerocode.core.constants.ZerocodeConstants.OK; import static org.jsmart.zerocode.core.kafka.common.CommonConfigs.BOOTSTRAP_SERVERS; import static org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils.resolveValuePlaceHolders; import static org.jsmart.zerocode.core.kafka.helper.KafkaProducerHelper.validateProduceRecord; /** * Provides an abstraction to allow using ZeroCode with AVRO format * which is partially supported by ZeroCode * (see Zerocode issue) */ public abstract class AbstractAvroMessagesSupport extends BasicKafkaClient { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAvroMessagesSupport.class); // unfortunately not accessible from BasicKafkaClient protected final ObjectMapper objectMapper = new ObjectMapperProvider().get(); // unfortunately not accessible from BasicKafkaClient protected final Gson gson = new GsonSerDeProvider().get(); // unfortunately not accessible from BasicKafkaClient @Inject(optional = true) @Named("kafka.producer.properties") protected String producerPropertyFile; // unfortunately not accessible from BasicKafkaClient @Inject(optional = true) @Named("kafka.consumer.properties") protected String consumerProperyFile; @Inject protected ConsumerCommonConfigs consumerCommonConfigs; private AvroHelper avroUtils = new AvroHelper(); /** * @return return a new instance of the {@link K} object in order to allow the {@link DatumReader} deserialization */ public abstract K getRecordKeyInstance(); /** * @return return a new instance of the {@link V} object in order to allow the {@link DatumReader} deserialization */ public abstract V getRecordValueInstance(); @Override public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) { LOGGER.info("brokers:{}, topicName:{}, operation:{}, requestJson:{}", brokers, topicName, operation, requestJson); try { switch (operation.toLowerCase()) { case "send": case "load": case "publish": case "produce": // produce Avro message based on JSON description in scenario files return produceCustomAvroMessage(brokers, topicName, requestJson); default: return super.execute(brokers, topicName, operation, requestJson, scenarioExecutionState); } } catch (Throwable exx) { LOGGER.error("Exception during operation:{}, topicName:{}, error:{}", operation, topicName, exx.getMessage()); throw new RuntimeException(exx); } } /** * Zerocode only support producing Avro messages thanks to the REST API (which not support Messages Headers). * This method build an Avro messages thanks to the JSON descriptor. * * @param brokers * @param topicName * @param requestJson * @return */ public String produceCustomAvroMessage(String brokers, String topicName, String requestJson) { // just create an avro producer, returning your avro java pojo Producer producer = createAvroProducer(brokers, producerPropertyFile); AtomicReference error = new AtomicReference<>(""); try { // read the actual zerocode configuration in the json scenario file ProducerJsonRecords jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class); List records = jsonRecords.getRecords(); validateProduceRecord(records); AtomicReference recordMetadata = new AtomicReference<>(); records.forEach(producerJsonRecord -> { RecordHeaders headers = buildHeaders(producerJsonRecord); try { // construct of the avro record ProducerRecord producerRecord = buildProducerRecord(topicName, producerJsonRecord, headers); // and send it RecordMetadata recordMetadataToReturn = sendRecord(producer, producerRecord).get(); // zero code need this metadata recordMetadata.set(recordMetadataToReturn); } catch (InterruptedException | ExecutionException | IOException e) { LOGGER.error("An error occurred during send !", e); error.set(e.getMessage()); } }); // dont forget to return a status for tests assertions return gson.toJson(new DeliveryDetails(OK, recordMetadata.get())); } catch (IOException e) { LOGGER.error("Unable to process json.", e); error.set(e.getMessage()); } return gson.toJson(new DeliveryDetails(FAILED, StringUtils.isNotBlank(error.get()) ? error.get() : "")); } private Future sendRecord(Producer producer, ProducerRecord producerRecord) { return producer.send(producerRecord, (metadata, e) -> { if (metadata != null) { // Record sent successfully. Exception == null and metadata != null LOGGER.debug("Message {}-{} successfully sent to topic={} part={} off={} at time={}", null, producerRecord, metadata.topic(), metadata.partition(), metadata.offset(), new Date(metadata.timestamp())); } else { // An error occurred. Exception != null and metadata == null // Correctly handle the exception according to your needs // /!\ If you don't process the exception, it is "fire-and-forget" like. Send or not or maybe :-) LOGGER.error("An error occurred during send !", e); } }); } private ProducerRecord buildProducerRecord(String topicName, ProducerJsonRecord producerJsonRecord, RecordHeaders headers) throws IOException { ProducerRecord producerRecord = new ProducerRecord<>( topicName, null, null, avroUtils.buildRecordKey(getRecordKeyInstance(), producerJsonRecord), avroUtils.buildRecordValue(getRecordValueInstance(), producerJsonRecord), headers); return producerRecord; } private RecordHeaders buildHeaders(ProducerJsonRecord producerJsonRecord) { RecordHeaders headers = new RecordHeaders(); producerJsonRecord.getHeaders().forEach((o, o2) -> headers.add(o.toString(), o2.toString().getBytes(StandardCharsets.UTF_8))); return headers; } private Producer createAvroProducer(String bootStrapServers, String producerPropertyFile) { return new KafkaProducer<>(defineKafkaProperties(bootStrapServers, producerPropertyFile)); } protected Properties defineKafkaProperties(String bootStrapServers, String producerPropertyFile) { try (InputStream propsIs = Resources.getResource(producerPropertyFile).openStream()) { Properties properties = new Properties(); properties.load(propsIs); properties.put(BOOTSTRAP_SERVERS, bootStrapServers); resolveValuePlaceHolders(properties); return properties; } catch (IOException e) { throw new RuntimeException("Exception while reading kafka producer properties - " + e); } } }