Skip to content

Commit

Permalink
add MultiRecordEvent decorator object; change enqueuedTime type to Zo… (
Browse files Browse the repository at this point in the history
#19)

* add MultiRecordEvent decorator object; change enqueuedTime type to ZonedDateTime to allow assigning an already parsed enqueuedTime to an event

* change MultiRecordEvent to follow single return principle.

* reverted Event interface enqueuedTimeUtc() return type to Object; reverted EventImpl enqueuedTimeUtc type to Object; removed secondary ctor; JSONEvent/PlainEvent enqueuedTime() now checks the existence of zoneId `Z` before adding it

* apply spotless

* add EnqueuedTime object with tests; change Event interface enqueuedTimeUtc() return type to Parseable; introduce secondary ctor to EventImpl; change ParsedEvent interface enqueuedTime() return type to Parseable; introduce Parseable<T> interface

* rename Parseable interface to EnqueuedTime; rename EnqueuedTime to EnqueuedTimeImpl

* rename EnqueuedTime.parsed() to EnqueuedTime.zonedDateTime() and move the interface to time package
  • Loading branch information
eemhu authored Feb 10, 2025
1 parent 954983c commit 4291695
Show file tree
Hide file tree
Showing 15 changed files with 490 additions and 21 deletions.
4 changes: 3 additions & 1 deletion src/main/java/com/teragrep/akv_01/event/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*/
package com.teragrep.akv_01.event;

import com.teragrep.akv_01.time.EnqueuedTime;

import java.util.Map;

public interface Event {
Expand All @@ -59,7 +61,7 @@ public interface Event {

public abstract Map<String, Object> systemProperties();

public abstract Object enqueuedTimeUtc();
public abstract EnqueuedTime enqueuedTimeUtc();

public abstract String offset();
}
21 changes: 18 additions & 3 deletions src/main/java/com/teragrep/akv_01/event/EventImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
*/
package com.teragrep.akv_01.event;

import jakarta.json.*;
import com.teragrep.akv_01.time.EnqueuedTime;
import com.teragrep.akv_01.time.EnqueuedTimeImpl;
import jakarta.json.Json;
import jakarta.json.JsonReader;
import jakarta.json.JsonStructure;
import jakarta.json.stream.JsonParsingException;

import java.io.StringReader;
Expand All @@ -58,7 +62,7 @@ public final class EventImpl implements Event {
private final Map<String, Object> partitionCtx;
private final Map<String, Object> properties;
private final Map<String, Object> systemProperties;
private final Object enqueuedTimeUtc;
private final EnqueuedTime enqueuedTimeUtc;
private final String offset;

public EventImpl(
Expand All @@ -68,6 +72,17 @@ public EventImpl(
final Map<String, Object> systemProperties,
final Object enqueuedTimeUtc,
final String offset
) {
this(payload, partitionCtx, properties, systemProperties, new EnqueuedTimeImpl(enqueuedTimeUtc), offset);
}

public EventImpl(
final String payload,
final Map<String, Object> partitionCtx,
final Map<String, Object> properties,
final Map<String, Object> systemProperties,
final EnqueuedTime enqueuedTimeUtc,
final String offset
) {
this.payload = payload;
this.partitionCtx = partitionCtx;
Expand Down Expand Up @@ -112,7 +127,7 @@ public Map<String, Object> systemProperties() {
return systemProperties;
}

public Object enqueuedTimeUtc() {
public EnqueuedTime enqueuedTimeUtc() {
return enqueuedTimeUtc;
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/teragrep/akv_01/event/JSONEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
*/
package com.teragrep.akv_01.event;

import com.teragrep.akv_01.time.EnqueuedTime;
import jakarta.json.JsonException;
import jakarta.json.JsonObject;
import jakarta.json.JsonStructure;
import jakarta.json.JsonValue;

import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -119,8 +119,8 @@ public Map<String, Object> systemProperties() {
}

@Override
public ZonedDateTime enqueuedTime() {
return ZonedDateTime.parse(event.enqueuedTimeUtc() + "Z");
public EnqueuedTime enqueuedTime() {
return event.enqueuedTimeUtc();
}

@Override
Expand Down
107 changes: 107 additions & 0 deletions src/main/java/com/teragrep/akv_01/event/MultiRecordEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Key Value Mapping for Microsoft Azure EventHub
* Copyright (C) 2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.akv_01.event;

import jakarta.json.JsonValue;

import java.util.List;

public final class MultiRecordEvent {

private final ParsedEvent parsedEvent;

public MultiRecordEvent(final ParsedEvent parsedEvent) {
this.parsedEvent = parsedEvent;
}

public boolean isValid() {
boolean valid = true;
if (!parsedEvent.isJsonStructure()) {
// not json structure
valid = false;
}

if (valid && !parsedEvent.asJsonStructure().getValueType().equals(JsonValue.ValueType.OBJECT)) {
// not json object
valid = false;
}

if (
valid && (!parsedEvent.asJsonStructure().asJsonObject().containsKey("records") || !parsedEvent
.asJsonStructure()
.asJsonObject()
.get("records")
.getValueType()
.equals(JsonValue.ValueType.ARRAY))
) {
// no records array
valid = false;
}

return valid;
}

public List<ParsedEvent> records() {
if (!isValid()) {
throw new IllegalStateException("Event is not a multi record event");
}

return parsedEvent
.asJsonStructure()
.asJsonObject()
.getJsonArray("records")
.getValuesAs(
jsonValue -> new EventImpl(
jsonValue.asJsonObject().toString(),
parsedEvent.partitionContext(),
parsedEvent.properties(),
parsedEvent.systemProperties(),
parsedEvent.enqueuedTime(),
parsedEvent.offset()
).parsedEvent()
);
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/teragrep/akv_01/event/ParsedEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
*/
package com.teragrep.akv_01.event;

import com.teragrep.akv_01.time.EnqueuedTime;
import jakarta.json.JsonStructure;

import java.time.ZonedDateTime;
import java.util.Map;

public interface ParsedEvent {
Expand All @@ -66,7 +66,7 @@ public interface ParsedEvent {

public abstract Map<String, Object> systemProperties();

public abstract ZonedDateTime enqueuedTime();
public abstract EnqueuedTime enqueuedTime();

public abstract String offset();
}
4 changes: 2 additions & 2 deletions src/main/java/com/teragrep/akv_01/event/ParsedEventStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
*/
package com.teragrep.akv_01.event;

import com.teragrep.akv_01.time.EnqueuedTime;
import jakarta.json.JsonStructure;

import java.time.ZonedDateTime;
import java.util.Map;

public final class ParsedEventStub implements ParsedEvent {
Expand Down Expand Up @@ -88,7 +88,7 @@ public Map<String, Object> systemProperties() {
}

@Override
public ZonedDateTime enqueuedTime() {
public EnqueuedTime enqueuedTime() {
throw new UnsupportedOperationException("Stub object");
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/teragrep/akv_01/event/PlainEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
*/
package com.teragrep.akv_01.event;

import com.teragrep.akv_01.time.EnqueuedTime;
import jakarta.json.JsonStructure;

import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -97,8 +97,8 @@ public Map<String, Object> systemProperties() {
}

@Override
public ZonedDateTime enqueuedTime() {
return ZonedDateTime.parse(event.enqueuedTimeUtc() + "Z");
public EnqueuedTime enqueuedTime() {
return event.enqueuedTimeUtc();
}

@Override
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/com/teragrep/akv_01/time/EnqueuedTime.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Key Value Mapping for Microsoft Azure EventHub
* Copyright (C) 2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.akv_01.time;

import java.time.ZonedDateTime;

public interface EnqueuedTime {

public abstract ZonedDateTime zonedDateTime();
}
67 changes: 67 additions & 0 deletions src/main/java/com/teragrep/akv_01/time/EnqueuedTimeImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Key Value Mapping for Microsoft Azure EventHub
* Copyright (C) 2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.akv_01.time;

import java.time.ZonedDateTime;

public final class EnqueuedTimeImpl implements EnqueuedTime {

private final Object origin;

public EnqueuedTimeImpl(final Object origin) {
this.origin = origin;
}

@Override
public ZonedDateTime zonedDateTime() {
return ZonedDateTime.parse(origin.toString() + "Z");
}

@Override
public String toString() {
return String.valueOf(origin);
}
}
Loading

0 comments on commit 4291695

Please sign in to comment.