diff --git a/pom.xml b/pom.xml index f90ca2a..cb726e5 100644 --- a/pom.xml +++ b/pom.xml @@ -1,137 +1,144 @@ - - 4.0.0 - - org.springframework.boot - spring-boot-starter-parent - 2.2.7.RELEASE - - - com.uci - orchestrator - 0.0.1-SNAPSHOT - orchestrator - Demo project for Spring Boot + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.2.7.RELEASE + + + com.uci + orchestrator + 0.0.1-SNAPSHOT + orchestrator + Demo project for Spring Boot - - 11 - + + 11 + - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-web-services - - - org.projectlombok - lombok - 1.18.12 - - - org.springframework.boot - spring-boot-starter-test - test - - - org.junit.vintage - junit-vintage-engine - - - - - - io.projectreactor.kafka - reactor-kafka - - - org.kie - kie-ci - 7.4.1.Final - - - org.drools - drools-decisiontables - 7.4.1.Final - - - org.drools - drools-core - 7.4.1.Final - - - com.fasterxml.jackson.core - jackson-core - 2.11.0 - compile - - - com.fasterxml.jackson.dataformat - jackson-dataformat-xml - 2.10.4 - compile - - - junit - junit - test - - - com.uci - dao - 1.0 - - - org.springframework.boot - spring-boot - - - org.springframework.kafka - spring-kafka - - - com.uci - message-rosa - 0.0.1-SNAPSHOT - - - com.uci - utils - 0.0.1-SNAPSHOT - - - org.springframework.boot - spring-boot-starter-web - - - io.fusionauth - fusionauth-java-client - 1.7.0 - - - org.springframework.data - spring-data-cassandra - - + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-web-services + + + org.projectlombok + lombok + 1.18.12 + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + io.projectreactor.kafka + reactor-kafka + + + org.kie + kie-ci + 7.4.1.Final + + + org.drools + drools-decisiontables + 7.4.1.Final + + + org.drools + drools-core + 7.4.1.Final + + + com.fasterxml.jackson.core + jackson-core + 2.11.0 + compile + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + 2.10.4 + compile + + + junit + junit + test + + + com.uci + dao + 1.0 + + + org.springframework.boot + spring-boot + + + org.springframework.kafka + spring-kafka + + + com.uci + message-rosa + 0.0.1-SNAPSHOT + + + com.uci + utils + 0.0.1-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-logging + + + + + io.fusionauth + fusionauth-java-client + 1.7.0 + + + org.springframework.data + spring-data-cassandra + + - - - - src/main/resources/ - - - src/main/java/com/uci/orchestrator/Drools/ - - - - - org.springframework.boot - spring-boot-maven-plugin - - - + + + + src/main/resources/ + + + src/main/java/com/uci/orchestrator/Drools/ + + + + + org.springframework.boot + spring-boot-maven-plugin + + + diff --git a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java index db47672..a2ecb59 100644 --- a/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java +++ b/src/main/java/com/uci/orchestrator/Consumer/ReactiveConsumer.java @@ -10,6 +10,10 @@ import com.uci.utils.encryption.AESWrapper; import com.uci.utils.kafka.ReactiveProducer; import com.uci.utils.kafka.SimpleProducer; +import com.uci.utils.telemetry.LogTelemetryMessage; +import com.uci.utils.telemetry.TelemetryLogger; +import com.uci.utils.telemetry.util.TelemetryEventNames; + import io.fusionauth.domain.api.UserResponse; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -19,6 +23,8 @@ import messagerosa.xml.XMessageParser; import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.protocol.types.Field; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.kie.api.runtime.KieSession; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -47,8 +53,8 @@ public class ReactiveConsumer { private final Flux> reactiveKafkaReceiver; - @Autowired - public KieSession kSession; +// @Autowired +// public KieSession kSession; @Autowired public XMessageRepository xMessageRepository; @@ -76,9 +82,15 @@ public class ReactiveConsumer { private final String DEFAULT_APP_NAME = "Global Bot"; LocalDateTime yesterday = LocalDateTime.now().minusDays(1L); + private static final Logger telemetrylogger = LogManager.getLogger(TelemetryLogger.class); + + @Value("${producer.id}") + private String producerID; + @EventListener(ApplicationStartedEvent.class) public void onMessage() { reactiveKafkaReceiver + .doOnError(genericError("Error in receiving message from kafka", null)) .doOnNext(new Consumer>() { @Override public void accept(ReceiverRecord stringMessage) { @@ -87,34 +99,41 @@ public void accept(ReceiverRecord stringMessage) { XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes())); SenderReceiverInfo from = msg.getFrom(); getAppName(msg.getPayload().getText(), msg.getFrom()) - .doOnNext(new Consumer() { + .doOnError(genericError("Error in getting app name", msg)) + .doOnNext(new Consumer() { @Override public void accept(String appName) { logTimeTaken(startTime, 2); fetchAdapterID(appName) - .doOnNext(new Consumer() { + .doOnError(genericError("Error in fetching adpater id", msg)) + .doOnNext(new Consumer() { @Override public void accept(String adapterID) { logTimeTaken(startTime, 3); from.setCampaignID(appName); from.setDeviceType(DeviceType.PHONE); resolveUser(from, appName) - .doOnNext(new Consumer() { + .doOnError(genericError("Error in resolving user", msg)) + .doOnNext(new Consumer() { @Override public void accept(SenderReceiverInfo from) { msg.setFrom(from); msg.setApp(appName); getLastMessageID(msg) - .doOnNext(lastMessageID -> { + .doOnError(genericError("Error in getting last message id", msg)) + .doOnNext(lastMessageID -> { logTimeTaken(startTime, 4); msg.setLastMessageID(lastMessageID); msg.setAdapterId(adapterID); + /* Logs for telemetry events */ + logTelemteryEvents(msg); if (msg.getMessageState().equals(XMessage.MessageState.REPLIED) || msg.getMessageState().equals(XMessage.MessageState.OPTED_IN)) { try { kafkaProducer.send(odkTransformerTopic, msg.toXML()); // reactiveProducer.sendMessages(odkTransformerTopic, msg.toXML()); } catch (JAXBException e) { e.printStackTrace(); + genericError("Error in sending message to kafka topic", msg); } logTimeTaken(startTime, 15); } @@ -143,6 +162,39 @@ public void accept(Throwable e) { }) .subscribe(); } + + private void logTelemteryEvents(XMessage msg) { + /* Start Conversation Log */ + if(msg.getMessageState().equals(XMessage.MessageState.SENT)) { + telemetrylogger.info(new LogTelemetryMessage(String.format("Message sent"), + TelemetryEventNames.SENT, "", msg.getChannel(), + msg.getProvider(), producerID, msg.getFrom().getUserID())); + } else if(msg.getMessageState().equals(XMessage.MessageState.DELIVERED)) { + telemetrylogger.info(new LogTelemetryMessage(String.format("Message Delivered"), + TelemetryEventNames.DELIVERED, "", msg.getChannel(), + msg.getProvider(), producerID, msg.getFrom().getUserID())); + } else if(msg.getMessageState().equals(XMessage.MessageState.READ)) { + telemetrylogger.info(new LogTelemetryMessage(String.format("Message Read"), + TelemetryEventNames.READ, "", msg.getChannel(), + msg.getProvider(), producerID, msg.getFrom().getUserID())); + } + } + + /* Error to be logged */ + private Consumer genericError(String s, XMessage xmsg) { + return c -> { + log.error(s + "::" + c.getMessage()); + /* Log Telemetry event - Exception */ + if(xmsg != null) { + telemetrylogger.info(new LogTelemetryMessage(s, + TelemetryEventNames.AUDITEXCEPTIONS, "", xmsg.getChannel(), + xmsg.getProvider(), producerID, xmsg.getFrom().getUserID())); + } else { + telemetrylogger.info(new LogTelemetryMessage(s, + TelemetryEventNames.AUDITEXCEPTIONS)); + } + }; + } private Mono resolveUser(SenderReceiverInfo from, String appName) { try { @@ -185,7 +237,7 @@ private Mono getLastMessageID(XMessage msg) { @Override public String apply(XMessageDAO msg1) { if (msg1.getId() == null) { - System.out.println("cError"); + System.out.println("Orchestrator: No Last Message ID on receiving kafka message from inbound."); return ""; } return String.valueOf(msg1.getId()); diff --git a/src/main/java/com/uci/orchestrator/Controllers/Health/ServiceStatusController.java b/src/main/java/com/uci/orchestrator/Controllers/Health/ServiceStatusController.java index 29ade7a..2cb62d5 100644 --- a/src/main/java/com/uci/orchestrator/Controllers/Health/ServiceStatusController.java +++ b/src/main/java/com/uci/orchestrator/Controllers/Health/ServiceStatusController.java @@ -6,11 +6,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.uci.dao.service.HealthService; +import com.uci.utils.telemetry.LogTelemetryBuilder; +import com.uci.utils.telemetry.TelemetryLogger; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; @@ -56,6 +60,24 @@ public ResponseEntity campaignUrlStatusCheck() throws JsonProcessingEx return ResponseEntity.ok(jsonNode); } + private static final Logger logger = LogManager.getLogger(); + + /* + * Test with default kafka appender + * telemetry object build internally via custom layout mentioned in xml by sent message + */ + @RequestMapping(value = "/test/logs", method = RequestMethod.GET, produces = { "application/json", "text/json" }) + public ResponseEntity testKafkaLogAppender() throws JsonProcessingException, IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = mapper.readTree("{\"responseCode\":\"OK\"}"); + + logger.info("Info Test Message"); + + logger.error("Error Test Message"); + + return ResponseEntity.ok(jsonNode); + } + /** * Returns json node for service response * diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 62ca039..6f6cb17 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -38,3 +38,8 @@ fusionauth.key = ${FUSIONAUTH_KEY} encryptionKeyString=A%C*F-JaNdRgUkXp +# log4j2 log topic config +kafka.logs.topic=${KAFKA_LOGS_TOPIC} + +producer.id=orchestrator + diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..a25f629 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,34 @@ + + + + ${env:KAFKA_LOGS_TOPIC} + ${env:BOOTSTRAP_SERVERS} + + + + + ${kafka.bootstrap.servers} + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file