import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.engine.preprocessor.ScenarioExecutionState;
import org.jsmart.zerocode.core.kafka.client.BasicKafkaClient;
import org.jsmart.zerocode.core.kafka.delivery.DeliveryDetails;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecord;
import org.jsmart.zerocode.core.kafka.send.message.ProducerJsonRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import static org.jsmart.zerocode.core.constants.ZerocodeConstants.FAILED;
import static org.jsmart.zerocode.core.constants.ZerocodeConstants.OK;
import static org.jsmart.zerocode.core.kafka.common.CommonConfigs.BOOTSTRAP_SERVERS;
import static org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils.resolveValuePlaceHolders;
import static org.jsmart.zerocode.core.kafka.helper.KafkaProducerHelper.validateProduceRecord;
/**
* Provides an abstraction to allow using ZeroCode with AVRO format
* which is partially supported by ZeroCode
* (see Zerocode issue)
*/
public abstract class AbstractAvroMessagesSupport extends BasicKafkaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAvroMessagesSupport.class);
// unfortunately not accessible from BasicKafkaClient
protected final ObjectMapper objectMapper = new ObjectMapperProvider().get();
// unfortunately not accessible from BasicKafkaClient
protected final Gson gson = new GsonSerDeProvider().get();
// unfortunately not accessible from BasicKafkaClient
@Inject(optional = true)
@Named("kafka.producer.properties")
protected String producerPropertyFile;
// unfortunately not accessible from BasicKafkaClient
@Inject(optional = true)
@Named("kafka.consumer.properties")
protected String consumerProperyFile;
@Inject
protected ConsumerCommonConfigs consumerCommonConfigs;
private AvroHelper avroUtils = new AvroHelper();
/**
* @return return a new instance of the {@link K} object in order to allow the {@link DatumReader} deserialization
*/
public abstract K getRecordKeyInstance();
/**
* @return return a new instance of the {@link V} object in order to allow the {@link DatumReader} deserialization
*/
public abstract V getRecordValueInstance();
@Override
public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) {
LOGGER.info("brokers:{}, topicName:{}, operation:{}, requestJson:{}", brokers, topicName, operation, requestJson);
try {
switch (operation.toLowerCase()) {
case "send":
case "load":
case "publish":
case "produce":
// produce Avro message based on JSON description in scenario files
return produceCustomAvroMessage(brokers, topicName, requestJson);
default:
return super.execute(brokers, topicName, operation, requestJson, scenarioExecutionState);
}
} catch (Throwable exx) {
LOGGER.error("Exception during operation:{}, topicName:{}, error:{}", operation, topicName, exx.getMessage());
throw new RuntimeException(exx);
}
}
/**
* Zerocode only support producing Avro messages thanks to the REST API (which not support Messages Headers).
* This method build an Avro messages thanks to the JSON descriptor.
*
* @param brokers
* @param topicName
* @param requestJson
* @return
*/
public String produceCustomAvroMessage(String brokers, String topicName, String requestJson) {
// just create an avro producer, returning your avro java pojo
Producer producer = createAvroProducer(brokers, producerPropertyFile);
AtomicReference error = new AtomicReference<>("");
try {
// read the actual zerocode configuration in the json scenario file
ProducerJsonRecords jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class);
List records = jsonRecords.getRecords();
validateProduceRecord(records);
AtomicReference recordMetadata = new AtomicReference<>();
records.forEach(producerJsonRecord -> {
RecordHeaders headers = buildHeaders(producerJsonRecord);
try {
// construct of the avro record
ProducerRecord producerRecord = buildProducerRecord(topicName, producerJsonRecord, headers);
// and send it
RecordMetadata recordMetadataToReturn = sendRecord(producer, producerRecord).get();
// zero code need this metadata
recordMetadata.set(recordMetadataToReturn);
} catch (InterruptedException | ExecutionException | IOException e) {
LOGGER.error("An error occurred during send !", e);
error.set(e.getMessage());
}
});
// dont forget to return a status for tests assertions
return gson.toJson(new DeliveryDetails(OK, recordMetadata.get()));
} catch (IOException e) {
LOGGER.error("Unable to process json.", e);
error.set(e.getMessage());
}
return gson.toJson(new DeliveryDetails(FAILED, StringUtils.isNotBlank(error.get()) ? error.get() : ""));
}
private Future sendRecord(Producer producer, ProducerRecord producerRecord) {
return producer.send(producerRecord, (metadata, e) -> {
if (metadata != null) {
// Record sent successfully. Exception == null and metadata != null
LOGGER.debug("Message {}-{} successfully sent to topic={} part={} off={} at time={}",
null,
producerRecord,
metadata.topic(),
metadata.partition(),
metadata.offset(),
new Date(metadata.timestamp()));
} else {
// An error occurred. Exception != null and metadata == null
// Correctly handle the exception according to your needs
// /!\ If you don't process the exception, it is "fire-and-forget" like. Send or not or maybe :-)
LOGGER.error("An error occurred during send !", e);
}
});
}
private ProducerRecord buildProducerRecord(String topicName, ProducerJsonRecord producerJsonRecord, RecordHeaders headers) throws IOException {
ProducerRecord producerRecord =
new ProducerRecord<>(
topicName,
null,
null,
avroUtils.buildRecordKey(getRecordKeyInstance(), producerJsonRecord),
avroUtils.buildRecordValue(getRecordValueInstance(), producerJsonRecord),
headers);
return producerRecord;
}
private RecordHeaders buildHeaders(ProducerJsonRecord producerJsonRecord) {
RecordHeaders headers = new RecordHeaders();
producerJsonRecord.getHeaders().forEach((o, o2) -> headers.add(o.toString(), o2.toString().getBytes(StandardCharsets.UTF_8)));
return headers;
}
private Producer createAvroProducer(String bootStrapServers, String producerPropertyFile) {
return new KafkaProducer<>(defineKafkaProperties(bootStrapServers, producerPropertyFile));
}
protected Properties defineKafkaProperties(String bootStrapServers, String producerPropertyFile) {
try (InputStream propsIs = Resources.getResource(producerPropertyFile).openStream()) {
Properties properties = new Properties();
properties.load(propsIs);
properties.put(BOOTSTRAP_SERVERS, bootStrapServers);
resolveValuePlaceHolders(properties);
return properties;
} catch (IOException e) {
throw new RuntimeException("Exception while reading kafka producer properties - " + e);
}
}
}