Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions code-examples/java/materialize-client/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cursor.txt
zed_token.txt
/target/**
.idea/
39 changes: 39 additions & 0 deletions code-examples/java/materialize-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.authzed</groupId>
<artifactId>materialize-client-example</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>com.authzed.api</groupId>
<artifactId>authzed</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.72.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
<version>1.72.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.72.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.authzed;

import com.authzed.api.materialize.v0.Cursor;
import com.authzed.api.materialize.v0.LookupPermissionSetsResponse;
import com.authzed.api.materialize.v0.PermissionSetChange;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.authzed.MaterializeClient.printCounts;
import static com.authzed.MaterializeClient.writeCursorToFile;


public class LookupPermissionSetsStreamObserver implements StreamObserver<LookupPermissionSetsResponse> {
private final Cursor[] cursor = {null};
private final CountDownLatch[] pageLatch;
private final int pageLimit;
private final AtomicBoolean lastPage;
private final AtomicInteger recordCount = new AtomicInteger(0);
private final AtomicInteger overallRecordCount = new AtomicInteger(0);
private final AtomicInteger memberToSetRecordCount = new AtomicInteger(0);
private final AtomicInteger setToSetRecordCount = new AtomicInteger(0);

public LookupPermissionSetsStreamObserver(CountDownLatch[] pageLatch, AtomicBoolean lastPage, int pageLimit) {
this.pageLatch = pageLatch;
this.lastPage = lastPage;
this.pageLimit = pageLimit;
}

@Override
public void onNext(LookupPermissionSetsResponse response) {
cursor[0] = response.getCursor();
writeCursorToFile(cursor[0]);

overallRecordCount.incrementAndGet();
recordCount.incrementAndGet();

PermissionSetChange permissionSetChange = response.getChange();
if (permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_ADDED) {
if (permissionSetChange.hasChildMember()) {
memberToSetRecordCount.incrementAndGet();
} else {
setToSetRecordCount.incrementAndGet();
}
}
}

@Override
public void onError(Throwable throwable) {
System.out.printf("Lookup permission sets api error %s%n",
throwable instanceof StatusRuntimeException ?
((StatusRuntimeException) throwable).getStatus() : throwable.getMessage());
if (cursor[0] != null) {
System.out.println("Current cursor: " + cursor[0]);
}
}

@Override
public void onCompleted() {
if (recordCount.get() == this.pageLimit) {
printCounts(memberToSetRecordCount, overallRecordCount, recordCount, setToSetRecordCount);
recordCount.set(0);
} else {
System.out.printf("Less than full page received, all pages complete. Record count: %s\n", recordCount.get());
printCounts(memberToSetRecordCount, overallRecordCount, recordCount, setToSetRecordCount);
lastPage.set(true);
}
pageLatch[0].countDown();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.authzed;

import com.authzed.api.materialize.v0.Cursor;
import com.authzed.api.materialize.v0.LookupPermissionSetsRequest;
import com.authzed.api.materialize.v0.WatchPermissionSetsRequest;
import com.authzed.api.materialize.v0.WatchPermissionSetsServiceGrpc;
import com.authzed.api.v1.ZedToken;
import com.authzed.grpcutil.BearerToken;
import com.google.protobuf.TextFormat;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;

import javax.net.ssl.SSLException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MaterializeClient {

private static final String BEARER_TOKEN = "some token";
private static final String SERVER_HOST = "localhost";
private static final int SERVER_PORT = 50054;
private static final String CURSOR_FILE = "cursor.txt";
private static final String ZED_TOKEN_FILE = "zed_token.txt";
private static final int PAGE_LIMIT = 10000;

public static void main(String[] args) throws InterruptedException, SSLException {

WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub = buildWatchPermissionSetsServiceStub();

System.out.println("Starting LookupPermissionSets...");
lookupPermissionSets(stub);

System.out.println("Starting WatchPermissionSets...");
watchPermissionSets(stub);
}

private static void lookupPermissionSets(WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub) throws InterruptedException {
final CountDownLatch[] pageLatch = {new CountDownLatch(1)};
AtomicBoolean lastPage = new AtomicBoolean(false);

LookupPermissionSetsStreamObserver responseStreamObserver = new LookupPermissionSetsStreamObserver(
pageLatch, lastPage, PAGE_LIMIT);

LookupPermissionSetsRequest request = LookupPermissionSetsRequest.newBuilder().setLimit(PAGE_LIMIT)
.build();

// Initial call to LPS, without any cursor.
stub.lookupPermissionSets(request, responseStreamObserver);
pageLatch[0].await();

// Page through results, using cursor to iterate through pages.
while (!lastPage.get()) {
pageLatch[0] = new CountDownLatch(1);
request = LookupPermissionSetsRequest.newBuilder().setLimit(PAGE_LIMIT).setOptionalStartingAfterCursor(readCursorFromFile())
.build();
stub.lookupPermissionSets(request, responseStreamObserver);
pageLatch[0].await();
}
}

private static void watchPermissionSets(WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub stub) {

WatchPermissionSetsStreamObserver responseStreamObserver = new WatchPermissionSetsStreamObserver();

Cursor cursor = readCursorFromFile();

WatchPermissionSetsRequest.Builder requestBuilder = WatchPermissionSetsRequest.newBuilder();
if (cursor != null) {
requestBuilder.setOptionalStartingAfter(cursor.getToken());
}
WatchPermissionSetsRequest request = requestBuilder.build();

stub.watchPermissionSets(request, responseStreamObserver);

// Keep the main thread alive to maintain the stream
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
System.out.println("Stream interrupted");
}

}

private static WatchPermissionSetsServiceGrpc.WatchPermissionSetsServiceStub buildWatchPermissionSetsServiceStub() throws SSLException {
BearerToken bearerToken = new BearerToken(BEARER_TOKEN);
ManagedChannel channel = NettyChannelBuilder.forAddress(SERVER_HOST, SERVER_PORT)
.sslContext(GrpcSslContexts.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build())
.build();

return WatchPermissionSetsServiceGrpc.newStub(channel)
.withCallCredentials(bearerToken);
}

private static Cursor readCursorFromFile() {
try {
String content = Files.readString(Paths.get(CURSOR_FILE));
Cursor.Builder cursorBuilder = Cursor.newBuilder();
TextFormat.merge(content, cursorBuilder);
return cursorBuilder.build();
} catch (Exception e) {
System.out.println("Could not read or parse cursor.txt file: " + e.getMessage());
return null;
}
}

static void writeCursorToFile(Cursor cursor) {
try (FileWriter writer = new FileWriter(CURSOR_FILE, false)) {
writer.write(cursor.toString());
} catch (IOException e) {
System.err.println("Error writing cursor to file: " + e.getMessage());
}
}

static void writeZedTokenToFile(ZedToken zedToken) {
try (FileWriter writer = new FileWriter(ZED_TOKEN_FILE, false)) {
writer.write(zedToken.toString());
} catch (IOException e) {
System.err.println("Error writing zed token to file: " + e.getMessage());
}
}

private static ZedToken readZedTokenFromFile() {
try {
String content = Files.readString(Paths.get(CURSOR_FILE));
ZedToken.Builder zedTokenBuilder = ZedToken.newBuilder();
TextFormat.merge(content, zedTokenBuilder);
return zedTokenBuilder.build();
} catch (Exception e) {
System.out.println("Could not read or parse zed_token.txt file: " + e.getMessage());
return null;
}
}

static void printCounts(AtomicInteger memberToSetRecordCount, AtomicInteger overallRecordCount, AtomicInteger recordCount, AtomicInteger setToSetRecordCount) {
System.out.printf("RecordCount: %d. OverallRecordCount: %d. MemberToSetCount: %d. SetToSetCount: %d.\n",
recordCount.get(),
overallRecordCount.get(),
memberToSetRecordCount.get(),
setToSetRecordCount.get()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.authzed;

import com.authzed.api.materialize.v0.WatchPermissionSetsResponse;
import com.authzed.api.materialize.v0.PermissionSetChange;
import com.authzed.api.v1.ZedToken;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.atomic.AtomicInteger;

import static com.authzed.MaterializeClient.printCounts;
import static com.authzed.MaterializeClient.writeZedTokenToFile;


public class WatchPermissionSetsStreamObserver implements StreamObserver<WatchPermissionSetsResponse> {
private final AtomicInteger recordCount = new AtomicInteger(0);
private final AtomicInteger memberToSetRecordCount = new AtomicInteger(0);
private final AtomicInteger setToSetRecordCount = new AtomicInteger(0);

@Override
public void onNext(WatchPermissionSetsResponse response) {
ZedToken zedToken = response.getCompletedRevision();
writeZedTokenToFile(zedToken);

recordCount.incrementAndGet();

PermissionSetChange permissionSetChange = response.getChange();
if (permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_ADDED ||
permissionSetChange.getOperation() == PermissionSetChange.SetOperation.SET_OPERATION_REMOVED) {
if (permissionSetChange.hasChildMember()) {
memberToSetRecordCount.incrementAndGet();
printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount);
} else {
setToSetRecordCount.incrementAndGet();
printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount);
}
}
}

@Override
public void onError(Throwable throwable) {
System.out.printf("Watch permission sets api error %s%n",
throwable instanceof StatusRuntimeException ?
((StatusRuntimeException) throwable).getStatus() : throwable.getMessage());
}

@Override
public void onCompleted() {
System.out.println("Stream completed");
printCounts(memberToSetRecordCount, recordCount, recordCount, setToSetRecordCount);
}
}