From 3fe26e3bb58147fa896d45c96b6eecefbfadd0c9 Mon Sep 17 00:00:00 2001 From: julianleonard123 Date: Wed, 11 Jun 2025 17:28:37 +0200 Subject: [PATCH 1/3] Initial commit of Materialize Client example. --- .../java/materialize-client/.gitignore | 4 + code-examples/java/materialize-client/pom.xml | 39 +++++ .../LookupPermissionSetsStreamObserver.java | 77 +++++++++ .../java/com/authzed/MaterializeClient.java | 151 ++++++++++++++++++ .../WatchPermissionSetsStreamObserver.java | 52 ++++++ 5 files changed, 323 insertions(+) create mode 100644 code-examples/java/materialize-client/.gitignore create mode 100644 code-examples/java/materialize-client/pom.xml create mode 100644 code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java create mode 100644 code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java create mode 100644 code-examples/java/materialize-client/src/main/java/com/authzed/WatchPermissionSetsStreamObserver.java diff --git a/code-examples/java/materialize-client/.gitignore b/code-examples/java/materialize-client/.gitignore new file mode 100644 index 0000000..62b3d81 --- /dev/null +++ b/code-examples/java/materialize-client/.gitignore @@ -0,0 +1,4 @@ +cursor.txt +zed_token.txt +/target/** +.idea/ diff --git a/code-examples/java/materialize-client/pom.xml b/code-examples/java/materialize-client/pom.xml new file mode 100644 index 0000000..e3f8867 --- /dev/null +++ b/code-examples/java/materialize-client/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + com.authzed + materialize-client-example + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + + + + + com.authzed.api + authzed + 1.3.1 + + + io.grpc + grpc-stub + 1.72.0 + + + io.grpc + grpc-api + 1.72.0 + + + io.grpc + grpc-netty-shaded + 1.72.0 + + + \ No newline at end of file diff --git a/code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java b/code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java new file mode 100644 index 0000000..db01d06 --- /dev/null +++ b/code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java @@ -0,0 +1,77 @@ +package com.authzed; + +import com.authzed.api.materialize.v0.Cursor; +import com.authzed.api.materialize.v0.LookupPermissionSetsResponse; +import com.authzed.api.materialize.v0.PermissionSetChange; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.authzed.MaterializeClient.printCounts; +import static com.authzed.MaterializeClient.writeCursorToFile; + + +public class LookupPermissionSetsStreamObserver implements StreamObserver { + private final Cursor[] cursor = {null}; + private final CountDownLatch[] pageLatch; + private final int pageLimit; + private final AtomicBoolean lastPage; + private final AtomicInteger recordCount = new AtomicInteger(0); + private final AtomicInteger overallRecordCount = new AtomicInteger(0); + private final AtomicInteger memberToSetRecordCount = new AtomicInteger(0); + private final AtomicInteger setToSetRecordCount = new AtomicInteger(0); + + public LookupPermissionSetsStreamObserver(CountDownLatch[] pageLatch, AtomicBoolean lastPage, int pageLimit) { + this.pageLatch = pageLatch; + this.lastPage = lastPage; + this.pageLimit = pageLimit; + } + + @Override + public void onNext(LookupPermissionSetsResponse response) { + cursor[0] = response.getCursor(); + writeCursorToFile(cursor[0]); + + overallRecordCount.incrementAndGet(); + recordCount.incrementAndGet(); + + PermissionSetChange permissionSetChange = response.getChange(); + if (permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_ADDED) { + if (permissionSetChange.hasChildMember()) { + memberToSetRecordCount.incrementAndGet(); + //MaterializeDB.addMemberToSetEvent(response); + } else { + setToSetRecordCount.incrementAndGet(); + //MaterializeDB.addSetToSetEvent(response); + } + } + } + + @Override + public void onError(Throwable throwable) { + System.out.printf("Lookup permission sets api error %s%n", + throwable instanceof StatusRuntimeException ? + ((StatusRuntimeException) throwable).getStatus() : throwable.getMessage()); + if (cursor[0] != null) { + System.out.println("Current cursor: " + cursor[0]); + } + } + + @Override + public void onCompleted() { + if (recordCount.get() == this.pageLimit) { + printCounts(memberToSetRecordCount, overallRecordCount, recordCount, setToSetRecordCount); + recordCount.set(0); + } else { + System.out.printf("Less than full page received, all pages complete. Record count: %s\n", recordCount.get()); + printCounts(memberToSetRecordCount, overallRecordCount, recordCount, setToSetRecordCount); + lastPage.set(true); + } + pageLatch[0].countDown(); + } + + + } \ No newline at end of file diff --git a/code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java b/code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java new file mode 100644 index 0000000..3e80731 --- /dev/null +++ b/code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java @@ -0,0 +1,151 @@ +package com.authzed; + +import com.authzed.api.materialize.v0.Cursor; +import com.authzed.api.materialize.v0.LookupPermissionSetsRequest; +import com.authzed.api.materialize.v0.WatchPermissionSetsRequest; +import com.authzed.api.materialize.v0.WatchPermissionSetsServiceGrpc; +import com.authzed.api.v1.ZedToken; +import com.authzed.grpcutil.BearerToken; +import com.google.protobuf.TextFormat; +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import javax.net.ssl.SSLException; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class MaterializeClient { + + private static final String BEARER_TOKEN = "sdbpk_62592e884be49ff6eeee54c4a65585169317d163827c35203948773f340fc3390e981b8df2c89033439be3b4d5dbb6d16b82baad81fbbdd301b4b1ea06b66f99e2c8d3d2ce686a84c9a80ce08bbbd746d1039823fb9de2be2bec7df87e69e7f8"; + private static final String SERVER_HOST = "localhost"; + private static final int SERVER_PORT = 50054; + private static final String CURSOR_FILE = "cursor.txt"; + private static final String ZED_TOKEN_FILE = "zed_token.txt"; + private static final int PAGE_LIMIT = 10000; + + public static void main(String[] args) throws InterruptedException, SSLException { + + WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub = buildWatchPermissionSetsServiceStub(); + + System.out.println("Starting LookupPermissionSets..."); + lookupPermissionSets(stub); + + System.out.println("Starting WatchPermissionSets..."); + watchPermissionSets(stub); + } + + private static void lookupPermissionSets(WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub) throws InterruptedException { + final CountDownLatch[] pageLatch = {new CountDownLatch(1)}; + AtomicBoolean lastPage = new AtomicBoolean(false); + + LookupPermissionSetsStreamObserver responseStreamObserver = new LookupPermissionSetsStreamObserver( + pageLatch, lastPage, PAGE_LIMIT); + + LookupPermissionSetsRequest request = LookupPermissionSetsRequest.newBuilder().setLimit(PAGE_LIMIT) + .build(); + + // Initial call to LPS, without any cursor. + stub.lookupPermissionSets(request, responseStreamObserver); + pageLatch[0].await(); + + // Page through results, using cursor to iterate through pages. + while (!lastPage.get()) { + pageLatch[0] = new CountDownLatch(1); + request = LookupPermissionSetsRequest.newBuilder().setLimit(PAGE_LIMIT).setOptionalStartingAfterCursor(readCursorFromFile()) + .build(); + stub.lookupPermissionSets(request, responseStreamObserver); + pageLatch[0].await(); + } + } + + private static void watchPermissionSets(WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub) { + + WatchPermissionSetsStreamObserver responseStreamObserver = new WatchPermissionSetsStreamObserver(); + + Cursor cursor = readCursorFromFile(); + + WatchPermissionSetsRequest.Builder requestBuilder = WatchPermissionSetsRequest.newBuilder(); + if (cursor != null) { + requestBuilder.setOptionalStartingAfter(cursor.getToken()); + } + WatchPermissionSetsRequest request = requestBuilder.build(); + + stub.watchPermissionSets(request, responseStreamObserver); + + // Keep the main thread alive to maintain the stream + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + System.out.println("Stream interrupted"); + } + + } + + private static WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub buildWatchPermissionSetsServiceStub() throws SSLException { + BearerToken bearerToken = new BearerToken(BEARER_TOKEN); + ManagedChannel channel = NettyChannelBuilder.forAddress(SERVER_HOST, SERVER_PORT) + .sslContext(GrpcSslContexts.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build()) + .build(); + + return WatchPermissionSetsServiceGrpc.newStub(channel) + .withCallCredentials(bearerToken); + } + + private static Cursor readCursorFromFile() { + try { + String content = Files.readString(Paths.get(CURSOR_FILE)); + Cursor.Builder cursorBuilder = Cursor.newBuilder(); + TextFormat.merge(content, cursorBuilder); + return cursorBuilder.build(); + } catch (Exception e) { + System.out.println("Could not read or parse cursor.txt file: " + e.getMessage()); + return null; + } + } + + static void writeCursorToFile(Cursor cursor) { + try (FileWriter writer = new FileWriter(CURSOR_FILE, false)) { + writer.write(cursor.toString()); + } catch (IOException e) { + System.err.println("Error writing cursor to file: " + e.getMessage()); + } + } + + static void writeZedTokenToFile(ZedToken zedToken) { + try (FileWriter writer = new FileWriter(ZED_TOKEN_FILE, false)) { + writer.write(zedToken.toString()); + } catch (IOException e) { + System.err.println("Error writing zed token to file: " + e.getMessage()); + } + } + + private static ZedToken readZedTokenFromFile() { + try { + String content = Files.readString(Paths.get(CURSOR_FILE)); + ZedToken.Builder zedTokenBuilder = ZedToken.newBuilder(); + TextFormat.merge(content, zedTokenBuilder); + return zedTokenBuilder.build(); + } catch (Exception e) { + System.out.println("Could not read or parse zed_token.txt file: " + e.getMessage()); + return null; + } + } + + static void printCounts(AtomicInteger memberToSetRecordCount, AtomicInteger overallRecordCount, AtomicInteger recordCount, AtomicInteger setToSetRecordCount) { + System.out.printf("RecordCount: %d. OverallRecordCount: %d. MemberToSetCount: %d. SetToSetCount: %d.\n", + recordCount.get(), + overallRecordCount.get(), + memberToSetRecordCount.get(), + setToSetRecordCount.get() + ); + } +} diff --git a/code-examples/java/materialize-client/src/main/java/com/authzed/WatchPermissionSetsStreamObserver.java b/code-examples/java/materialize-client/src/main/java/com/authzed/WatchPermissionSetsStreamObserver.java new file mode 100644 index 0000000..1ceedb3 --- /dev/null +++ b/code-examples/java/materialize-client/src/main/java/com/authzed/WatchPermissionSetsStreamObserver.java @@ -0,0 +1,52 @@ +package com.authzed; + +import com.authzed.api.materialize.v0.WatchPermissionSetsResponse; +import com.authzed.api.materialize.v0.PermissionSetChange; +import com.authzed.api.v1.ZedToken; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; + +import java.util.concurrent.atomic.AtomicInteger; + +import static com.authzed.MaterializeClient.printCounts; +import static com.authzed.MaterializeClient.writeZedTokenToFile; + + +public class WatchPermissionSetsStreamObserver implements StreamObserver { + private final AtomicInteger recordCount = new AtomicInteger(0); + private final AtomicInteger memberToSetRecordCount = new AtomicInteger(0); + private final AtomicInteger setToSetRecordCount = new AtomicInteger(0); + + @Override + public void onNext(WatchPermissionSetsResponse response) { + ZedToken zedToken = response.getCompletedRevision(); + writeZedTokenToFile(zedToken); + + recordCount.incrementAndGet(); + + PermissionSetChange permissionSetChange = response.getChange(); + if (permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_ADDED || + permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_REMOVED) { + if (permissionSetChange.hasChildMember()) { + memberToSetRecordCount.incrementAndGet(); + printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount); + } else { + setToSetRecordCount.incrementAndGet(); + printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount); + } + } + } + + @Override + public void onError(Throwable throwable) { + System.out.printf("Watch permission sets api error %s%n", + throwable instanceof StatusRuntimeException ? + ((StatusRuntimeException) throwable).getStatus() : throwable.getMessage()); + } + + @Override + public void onCompleted() { + System.out.println("Stream completed"); + printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount); + } +} \ No newline at end of file From 3085c99b3c1057018f9266916961364da2df21b1 Mon Sep 17 00:00:00 2001 From: julianleonard123 Date: Wed, 11 Jun 2025 17:30:08 +0200 Subject: [PATCH 2/3] Remove token --- .../src/main/java/com/authzed/MaterializeClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java b/code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java index 3e80731..6b768c9 100644 --- a/code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java +++ b/code-examples/java/materialize-client/src/main/java/com/authzed/MaterializeClient.java @@ -23,7 +23,7 @@ public class MaterializeClient { - private static final String BEARER_TOKEN = "sdbpk_62592e884be49ff6eeee54c4a65585169317d163827c35203948773f340fc3390e981b8df2c89033439be3b4d5dbb6d16b82baad81fbbdd301b4b1ea06b66f99e2c8d3d2ce686a84c9a80ce08bbbd746d1039823fb9de2be2bec7df87e69e7f8"; + private static final String BEARER_TOKEN = "some token"; private static final String SERVER_HOST = "localhost"; private static final int SERVER_PORT = 50054; private static final String CURSOR_FILE = "cursor.txt"; From 2c0f942eb309bba393c1faaa4d7ee3aaa5e38b0c Mon Sep 17 00:00:00 2001 From: julianleonard123 Date: Wed, 11 Jun 2025 17:31:35 +0200 Subject: [PATCH 3/3] Remove commented out calls to database which is not part of example. --- .../java/com/authzed/LookupPermissionSetsStreamObserver.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java b/code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java index db01d06..693982a 100644 --- a/code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java +++ b/code-examples/java/materialize-client/src/main/java/com/authzed/LookupPermissionSetsStreamObserver.java @@ -42,10 +42,8 @@ public void onNext(LookupPermissionSetsResponse response) { if (permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_ADDED) { if (permissionSetChange.hasChildMember()) { memberToSetRecordCount.incrementAndGet(); - //MaterializeDB.addMemberToSetEvent(response); } else { setToSetRecordCount.incrementAndGet(); - //MaterializeDB.addSetToSetEvent(response); } } }