Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima

This release also includes changes from <<release-3-7-6, 3.7.6>>.

* Expose serialization functions for alternative transport protocols in gremlin-go


[[release-3-8-0]]
=== TinkerPop 3.8.0 (Release Date: November 12, 2025)
Expand Down
4 changes: 2 additions & 2 deletions gremlin-go/driver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (client *Client) Close() {
// SubmitWithOptions submits a Gremlin script to the server with specified RequestOptions and returns a ResultSet.
func (client *Client) SubmitWithOptions(traversalString string, requestOptions RequestOptions) (ResultSet, error) {
client.logHandler.logf(Debug, submitStartedString, traversalString)
request := makeStringRequest(traversalString, client.traversalSource, client.session, requestOptions)
request := MakeStringRequest(traversalString, client.traversalSource, client.session, requestOptions)
result, err := client.connections.write(&request)
if err != nil {
client.logHandler.logf(Error, logErrorGeneric, "Client.Submit()", err.Error())
Expand All @@ -171,7 +171,7 @@ func (client *Client) Submit(traversalString string, bindings ...map[string]inte
// submitBytecode submits Bytecode to the server to execute and returns a ResultSet.
func (client *Client) submitBytecode(bytecode *Bytecode) (ResultSet, error) {
client.logHandler.logf(Debug, submitStartedBytecode, *bytecode)
request := makeBytecodeRequest(bytecode, client.traversalSource, client.session)
request := MakeBytecodeRequest(bytecode, client.traversalSource, client.session)
return client.connections.write(&request)
}

Expand Down
8 changes: 4 additions & 4 deletions gremlin-go/driver/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestConnection(t *testing.T) {
assert.NotNil(t, connection)
assert.Equal(t, established, connection.state)
defer deferredCleanup(t, connection)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, err)
assert.NotNil(t, resultSet)
Expand All @@ -400,7 +400,7 @@ func TestConnection(t *testing.T) {
assert.NotNil(t, connection)
assert.Equal(t, established, connection.state)
defer deferredCleanup(t, connection)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, err)
assert.NotNil(t, resultSet)
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestConnection(t *testing.T) {
err = connection.close()
assert.Nil(t, err)
assert.Equal(t, closed, connection.state)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, resultSet)
assert.Equal(t, newError(err0102WriteConnectionClosedError), err)
Expand All @@ -452,7 +452,7 @@ func TestConnection(t *testing.T) {
assert.Equal(t, established, connection.state)
assert.Nil(t, err)
time.Sleep(120 * time.Second)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, resultSet)
assert.NotNil(t, err)
Expand Down
18 changes: 9 additions & 9 deletions gremlin-go/driver/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type protocolBase struct {
type gremlinServerWSProtocol struct {
*protocolBase

serializer serializer
serializer Serializer
logHandler *logHandler
closed bool
mutex sync.Mutex
Expand Down Expand Up @@ -72,7 +72,7 @@ func (protocol *gremlinServerWSProtocol) readLoop(resultSets *synchronizedMap, e
}

// Deserialize message and unpack.
resp, err := protocol.serializer.deserializeMessage(msg)
resp, err := protocol.serializer.DeserializeMessage(msg)
if err != nil {
protocol.logHandler.logf(Error, logErrorGeneric, "gremlinServerWSProtocol.readLoop()", err.Error())
readErrorHandler(resultSets, errorCallback, err, protocol.logHandler)
Expand All @@ -94,9 +94,9 @@ func readErrorHandler(resultSets *synchronizedMap, errorCallback func(), err err
errorCallback()
}

func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronizedMap, response response) error {
responseID, statusCode, metadata, data := response.responseID, response.responseStatus.code,
response.responseResult.meta, response.responseResult.data
func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronizedMap, response Response) error {
responseID, statusCode, metadata, data := response.ResponseID, response.ResponseStatus.code,
response.ResponseResult.Meta, response.ResponseResult.Data
responseIDString := responseID.String()
if resultSets.load(responseIDString) == nil {
return newError(err0501ResponseHandlerResultSetNotCreatedError)
Expand All @@ -113,7 +113,7 @@ func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronize
} else if statusCode == http.StatusOK {
// Add data and status attributes to the ResultSet.
resultSets.load(responseIDString).addResult(&Result{data})
resultSets.load(responseIDString).setStatusAttributes(response.responseStatus.attributes)
resultSets.load(responseIDString).setStatusAttributes(response.ResponseStatus.attributes)
resultSets.load(responseIDString).Close()
protocol.logHandler.logf(Debug, readComplete, responseIDString)
} else if statusCode == http.StatusPartialContent {
Expand All @@ -137,10 +137,10 @@ func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronize
}
} else {
resultSets.load(responseIDString).Close()
return newError(err0503ResponseHandlerAuthError, response.responseStatus, response.responseResult)
return newError(err0503ResponseHandlerAuthError, response.ResponseStatus, response.ResponseResult)
}
} else {
newError := newError(err0502ResponseHandlerReadLoopError, response.responseStatus, statusCode)
newError := newError(err0502ResponseHandlerReadLoopError, response.ResponseStatus, statusCode)
resultSets.load(responseIDString).setError(newError)
resultSets.load(responseIDString).Close()
protocol.logHandler.logf(Error, logErrorGeneric, "gremlinServerWSProtocol.responseHandler()", newError.Error())
Expand All @@ -149,7 +149,7 @@ func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronize
}

func (protocol *gremlinServerWSProtocol) write(request *request) error {
bytes, err := protocol.serializer.serializeMessage(request)
bytes, err := protocol.serializer.SerializeMessage(request)
if err != nil {
return err
}
Expand Down
46 changes: 44 additions & 2 deletions gremlin-go/driver/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,28 @@ const sessionProcessor = "session"
const stringOp = "eval"
const stringProcessor = ""

func makeStringRequest(stringGremlin string, traversalSource string, sessionId string, requestOptions RequestOptions) (req request) {
// MakeStringRequest creates a request from a Gremlin string query for submission to a Gremlin server.
//
// This function is exposed publicly to enable alternative transport protocols (gRPC, HTTP/2, etc.)
// to construct properly formatted requests outside the standard WebSocket client. The returned
// request can then be serialized using SerializeMessage().
//
// Parameters:
// - stringGremlin: The Gremlin query string to execute
// - traversalSource: The name of the traversal source (typically "g")
// - sessionId: Optional session ID for stateful requests (use "" for stateless)
// - requestOptions: Options such as bindings, timeout, batch size, etc.
//
// Returns:
// - request: A request structure ready for serialization
//
// Example for alternative transports:
//
// req := MakeStringRequest("g.V().count()", "g", "", RequestOptions{})
// serializer := newGraphBinarySerializer(nil)
// bytes, _ := serializer.(graphBinarySerializer).SerializeMessage(&req)
// // Send bytes over gRPC, HTTP/2, etc.
func MakeStringRequest(stringGremlin string, traversalSource string, sessionId string, requestOptions RequestOptions) (req request) {
newProcessor := stringProcessor
newArgs := map[string]interface{}{
"gremlin": stringGremlin,
Expand Down Expand Up @@ -88,7 +109,28 @@ const bytecodeProcessor = "traversal"
const authOp = "authentication"
const authProcessor = "traversal"

func makeBytecodeRequest(bytecodeGremlin *Bytecode, traversalSource string, sessionId string) (req request) {
// MakeBytecodeRequest creates a request from Gremlin bytecode for submission to a Gremlin server.
//
// This function is exposed publicly to enable alternative transport protocols (gRPC, HTTP/2, etc.)
// to construct properly formatted requests outside the standard WebSocket client. The returned
// request can then be serialized using SerializeMessage().
//
// Parameters:
// - bytecodeGremlin: The Gremlin bytecode to execute
// - traversalSource: The name of the traversal source (typically "g")
// - sessionId: Optional session ID for stateful requests (use "" for stateless)
//
// Returns:
// - request: A request structure ready for serialization
//
// Example for alternative transports:
//
// bytecode := g.V().HasLabel("person").Bytecode
// req := MakeBytecodeRequest(bytecode, "g", "")
// serializer := newGraphBinarySerializer(nil)
// bytes, _ := serializer.(graphBinarySerializer).SerializeMessage(&req)
// // Send bytes over gRPC, HTTP/2, etc.
func MakeBytecodeRequest(bytecodeGremlin *Bytecode, traversalSource string, sessionId string) (req request) {
newProcessor := bytecodeProcessor
newArgs := map[string]interface{}{
"gremlin": *bytecodeGremlin,
Expand Down
13 changes: 7 additions & 6 deletions gremlin-go/driver/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,45 @@ under the License.
package gremlingo

import (
"github.com/google/uuid"
"testing"

"github.com/google/uuid"

"github.com/stretchr/testify/assert"
)

func TestRequest(t *testing.T) {
t.Run("Test makeStringRequest() with custom requestID", func(t *testing.T) {
requestId := uuid.New()
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetRequestId(requestId).Create())
assert.Equal(t, requestId, r.requestID)
})

t.Run("Test makeStringRequest() with no bindings", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "", *new(RequestOptions))
r := MakeStringRequest("g.V()", "g", "", *new(RequestOptions))
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
})

t.Run("Test makeStringRequest() with custom evaluationTimeout", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetEvaluationTimeout(1234).Create())
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
assert.Equal(t, 1234, r.args["evaluationTimeout"])
})

t.Run("Test makeStringRequest() with custom batchSize", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetBatchSize(123).Create())
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
assert.Equal(t, 123, r.args["batchSize"])
})

t.Run("Test makeStringRequest() with custom userAgent", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetUserAgent("TestUserAgent").Create())
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
Expand Down
22 changes: 11 additions & 11 deletions gremlin-go/driver/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ package gremlingo

import "github.com/google/uuid"

// responseStatus contains the status info of the response.
type responseStatus struct {
// ResponseStatus contains the status info of the response.
type ResponseStatus struct {
code uint16
message string
attributes map[string]interface{}
}

// responseResult contains the result info of the response.
type responseResult struct {
meta map[string]interface{}
data interface{}
// ResponseResult contains the result info of the response.
type ResponseResult struct {
Meta map[string]interface{}
Data interface{}
}

// response represents a response from the server.
type response struct {
responseID uuid.UUID
responseStatus responseStatus
responseResult responseResult
// Response represents a Response from the server.
type Response struct {
ResponseID uuid.UUID
ResponseStatus ResponseStatus
ResponseResult ResponseResult
}
45 changes: 45 additions & 0 deletions gremlin-go/driver/resultSet.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,48 @@ func newChannelResultSetCapacity(requestID string, container *synchronizedMap, c
func newChannelResultSet(requestID string, container *synchronizedMap) ResultSet {
return newChannelResultSetCapacity(requestID, container, defaultCapacity)
}

// NewResultSet creates a new ResultSet from a slice of Result objects.
// This function enables custom transport implementations to create ResultSets from
// results collected via alternative protocols.
//
// The function creates a channel-based ResultSet, pre-populates it with the provided results,
// and closes the channel to indicate completion.
//
// Parameters:
// - results: A slice of Result objects to include in the ResultSet
//
// Returns:
// - ResultSet: A ResultSet containing all the provided results
//
// Example usage:
//
// var results []*Result
// // Collect results from custom transport
// for _, responseBytes := range responses {
// result, _ := DeserializeResult(responseBytes)
// results = append(results, result)
// }
// resultSet := NewResultSet(results)
// allResults, _ := resultSet.All()
func NewResultSet(results []*Result) ResultSet {
// Create a channel-based result set with capacity for all results
channelSize := len(results)
if channelSize == 0 {
channelSize = 1 // Ensure at least size 1
}
rs := newChannelResultSetCapacity("", &synchronizedMap{make(map[string]ResultSet), sync.Mutex{}}, channelSize).(*channelResultSet)

// Add all results to the channel
for _, result := range results {
rs.channel <- result
}

// Close the channel to indicate no more results
rs.channelMutex.Lock()
rs.closed = true
close(rs.channel)
rs.channelMutex.Unlock()

return rs
}
Loading
Loading