Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
[[release-3-8-1]]
=== TinkerPop 3.8.1 (NOT OFFICIALLY RELEASED YET)

* Fixed bug in pre-repeat() `emit()/until()` where `emit()` and `until()` traversers weren't added to the results.

This release also includes changes from <<release-3-7-6, 3.7.6>>.

* Fixed bug in pre-repeat() `emit()/until()` where `emit()` and `until()` traversers weren't added to the results.
* Expose serialization functions for alternative transport protocols in gremlin-go
* Improved Gremlint formatting to keep the first argument for a step on the same line if line breaks were required to meet max line length.
* Improved Gremlint formatting to do greedy argument packing when possible so that more arguments can appear on a single line.

Expand Down
4 changes: 2 additions & 2 deletions gremlin-go/driver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (client *Client) Close() {
// SubmitWithOptions submits a Gremlin script to the server with specified RequestOptions and returns a ResultSet.
func (client *Client) SubmitWithOptions(traversalString string, requestOptions RequestOptions) (ResultSet, error) {
client.logHandler.logf(Debug, submitStartedString, traversalString)
request := makeStringRequest(traversalString, client.traversalSource, client.session, requestOptions)
request := MakeStringRequest(traversalString, client.traversalSource, client.session, requestOptions)
result, err := client.connections.write(&request)
if err != nil {
client.logHandler.logf(Error, logErrorGeneric, "Client.Submit()", err.Error())
Expand All @@ -171,7 +171,7 @@ func (client *Client) Submit(traversalString string, bindings ...map[string]inte
// submitBytecode submits Bytecode to the server to execute and returns a ResultSet.
func (client *Client) submitBytecode(bytecode *Bytecode) (ResultSet, error) {
client.logHandler.logf(Debug, submitStartedBytecode, *bytecode)
request := makeBytecodeRequest(bytecode, client.traversalSource, client.session)
request := MakeBytecodeRequest(bytecode, client.traversalSource, client.session)
return client.connections.write(&request)
}

Expand Down
8 changes: 4 additions & 4 deletions gremlin-go/driver/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestConnection(t *testing.T) {
assert.NotNil(t, connection)
assert.Equal(t, established, connection.state)
defer deferredCleanup(t, connection)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, err)
assert.NotNil(t, resultSet)
Expand All @@ -400,7 +400,7 @@ func TestConnection(t *testing.T) {
assert.NotNil(t, connection)
assert.Equal(t, established, connection.state)
defer deferredCleanup(t, connection)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, err)
assert.NotNil(t, resultSet)
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestConnection(t *testing.T) {
err = connection.close()
assert.Nil(t, err)
assert.Equal(t, closed, connection.state)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, resultSet)
assert.Equal(t, newError(err0102WriteConnectionClosedError), err)
Expand All @@ -452,7 +452,7 @@ func TestConnection(t *testing.T) {
assert.Equal(t, established, connection.state)
assert.Nil(t, err)
time.Sleep(120 * time.Second)
request := makeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
request := MakeStringRequest("g.V().count()", "g", "", *new(RequestOptions))
resultSet, err := connection.write(&request)
assert.Nil(t, resultSet)
assert.NotNil(t, err)
Expand Down
18 changes: 9 additions & 9 deletions gremlin-go/driver/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type protocolBase struct {
type gremlinServerWSProtocol struct {
*protocolBase

serializer serializer
serializer Serializer
logHandler *logHandler
closed bool
mutex sync.Mutex
Expand Down Expand Up @@ -72,7 +72,7 @@ func (protocol *gremlinServerWSProtocol) readLoop(resultSets *synchronizedMap, e
}

// Deserialize message and unpack.
resp, err := protocol.serializer.deserializeMessage(msg)
resp, err := protocol.serializer.DeserializeMessage(msg)
if err != nil {
protocol.logHandler.logf(Error, logErrorGeneric, "gremlinServerWSProtocol.readLoop()", err.Error())
readErrorHandler(resultSets, errorCallback, err, protocol.logHandler)
Expand All @@ -94,9 +94,9 @@ func readErrorHandler(resultSets *synchronizedMap, errorCallback func(), err err
errorCallback()
}

func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronizedMap, response response) error {
responseID, statusCode, metadata, data := response.responseID, response.responseStatus.code,
response.responseResult.meta, response.responseResult.data
func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronizedMap, response Response) error {
responseID, statusCode, metadata, data := response.ResponseID, response.ResponseStatus.code,
response.ResponseResult.Meta, response.ResponseResult.Data
responseIDString := responseID.String()
if resultSets.load(responseIDString) == nil {
return newError(err0501ResponseHandlerResultSetNotCreatedError)
Expand All @@ -113,7 +113,7 @@ func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronize
} else if statusCode == http.StatusOK {
// Add data and status attributes to the ResultSet.
resultSets.load(responseIDString).addResult(&Result{data})
resultSets.load(responseIDString).setStatusAttributes(response.responseStatus.attributes)
resultSets.load(responseIDString).setStatusAttributes(response.ResponseStatus.attributes)
resultSets.load(responseIDString).Close()
protocol.logHandler.logf(Debug, readComplete, responseIDString)
} else if statusCode == http.StatusPartialContent {
Expand All @@ -137,10 +137,10 @@ func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronize
}
} else {
resultSets.load(responseIDString).Close()
return newError(err0503ResponseHandlerAuthError, response.responseStatus, response.responseResult)
return newError(err0503ResponseHandlerAuthError, response.ResponseStatus, response.ResponseResult)
}
} else {
newError := newError(err0502ResponseHandlerReadLoopError, response.responseStatus, statusCode)
newError := newError(err0502ResponseHandlerReadLoopError, response.ResponseStatus, statusCode)
resultSets.load(responseIDString).setError(newError)
resultSets.load(responseIDString).Close()
protocol.logHandler.logf(Error, logErrorGeneric, "gremlinServerWSProtocol.responseHandler()", newError.Error())
Expand All @@ -149,7 +149,7 @@ func (protocol *gremlinServerWSProtocol) responseHandler(resultSets *synchronize
}

func (protocol *gremlinServerWSProtocol) write(request *request) error {
bytes, err := protocol.serializer.serializeMessage(request)
bytes, err := protocol.serializer.SerializeMessage(request)
if err != nil {
return err
}
Expand Down
46 changes: 44 additions & 2 deletions gremlin-go/driver/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,28 @@ const sessionProcessor = "session"
const stringOp = "eval"
const stringProcessor = ""

func makeStringRequest(stringGremlin string, traversalSource string, sessionId string, requestOptions RequestOptions) (req request) {
// MakeStringRequest creates a request from a Gremlin string query for submission to a Gremlin server.
//
// This function is exposed publicly to enable alternative transport protocols (gRPC, HTTP/2, etc.)
// to construct properly formatted requests outside the standard WebSocket client. The returned
// request can then be serialized using SerializeMessage().
//
// Parameters:
// - stringGremlin: The Gremlin query string to execute
// - traversalSource: The name of the traversal source (typically "g")
// - sessionId: Optional session ID for stateful requests (use "" for stateless)
// - requestOptions: Options such as bindings, timeout, batch size, etc.
//
// Returns:
// - request: A request structure ready for serialization
//
// Example for alternative transports:
//
// req := MakeStringRequest("g.V().count()", "g", "", RequestOptions{})
// serializer := newGraphBinarySerializer(nil)
// bytes, _ := serializer.(graphBinarySerializer).SerializeMessage(&req)
// // Send bytes over gRPC, HTTP/2, etc.
func MakeStringRequest(stringGremlin string, traversalSource string, sessionId string, requestOptions RequestOptions) (req request) {
newProcessor := stringProcessor
newArgs := map[string]interface{}{
"gremlin": stringGremlin,
Expand Down Expand Up @@ -88,7 +109,28 @@ const bytecodeProcessor = "traversal"
const authOp = "authentication"
const authProcessor = "traversal"

func makeBytecodeRequest(bytecodeGremlin *Bytecode, traversalSource string, sessionId string) (req request) {
// MakeBytecodeRequest creates a request from Gremlin bytecode for submission to a Gremlin server.
//
// This function is exposed publicly to enable alternative transport protocols (gRPC, HTTP/2, etc.)
// to construct properly formatted requests outside the standard WebSocket client. The returned
// request can then be serialized using SerializeMessage().
//
// Parameters:
// - bytecodeGremlin: The Gremlin bytecode to execute
// - traversalSource: The name of the traversal source (typically "g")
// - sessionId: Optional session ID for stateful requests (use "" for stateless)
//
// Returns:
// - request: A request structure ready for serialization
//
// Example for alternative transports:
//
// bytecode := g.V().HasLabel("person").Bytecode
// req := MakeBytecodeRequest(bytecode, "g", "")
// serializer := newGraphBinarySerializer(nil)
// bytes, _ := serializer.(graphBinarySerializer).SerializeMessage(&req)
// // Send bytes over gRPC, HTTP/2, etc.
func MakeBytecodeRequest(bytecodeGremlin *Bytecode, traversalSource string, sessionId string) (req request) {
newProcessor := bytecodeProcessor
newArgs := map[string]interface{}{
"gremlin": *bytecodeGremlin,
Expand Down
13 changes: 7 additions & 6 deletions gremlin-go/driver/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,45 @@ under the License.
package gremlingo

import (
"github.com/google/uuid"
"testing"

"github.com/google/uuid"

"github.com/stretchr/testify/assert"
)

func TestRequest(t *testing.T) {
t.Run("Test makeStringRequest() with custom requestID", func(t *testing.T) {
requestId := uuid.New()
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetRequestId(requestId).Create())
assert.Equal(t, requestId, r.requestID)
})

t.Run("Test makeStringRequest() with no bindings", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "", *new(RequestOptions))
r := MakeStringRequest("g.V()", "g", "", *new(RequestOptions))
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
})

t.Run("Test makeStringRequest() with custom evaluationTimeout", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetEvaluationTimeout(1234).Create())
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
assert.Equal(t, 1234, r.args["evaluationTimeout"])
})

t.Run("Test makeStringRequest() with custom batchSize", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetBatchSize(123).Create())
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
assert.Equal(t, 123, r.args["batchSize"])
})

t.Run("Test makeStringRequest() with custom userAgent", func(t *testing.T) {
r := makeStringRequest("g.V()", "g", "",
r := MakeStringRequest("g.V()", "g", "",
new(RequestOptionsBuilder).SetUserAgent("TestUserAgent").Create())
assert.NotNil(t, r.requestID)
assert.NotEqual(t, uuid.Nil, r.requestID)
Expand Down
22 changes: 11 additions & 11 deletions gremlin-go/driver/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ package gremlingo

import "github.com/google/uuid"

// responseStatus contains the status info of the response.
type responseStatus struct {
// ResponseStatus contains the status info of the response.
type ResponseStatus struct {
code uint16
message string
attributes map[string]interface{}
}

// responseResult contains the result info of the response.
type responseResult struct {
meta map[string]interface{}
data interface{}
// ResponseResult contains the result info of the response.
type ResponseResult struct {
Meta map[string]interface{}
Data interface{}
}

// response represents a response from the server.
type response struct {
responseID uuid.UUID
responseStatus responseStatus
responseResult responseResult
// Response represents a Response from the server.
type Response struct {
ResponseID uuid.UUID
ResponseStatus ResponseStatus
ResponseResult ResponseResult
}
46 changes: 46 additions & 0 deletions gremlin-go/driver/resultSet.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,49 @@ func newChannelResultSetCapacity(requestID string, container *synchronizedMap, c
func newChannelResultSet(requestID string, container *synchronizedMap) ResultSet {
return newChannelResultSetCapacity(requestID, container, defaultCapacity)
}

// NewResultSet creates a new ResultSet from a slice of Result objects.
// This function enables custom transport implementations to create ResultSets from
// results collected via alternative protocols.
//
// The function creates a channel-based ResultSet, pre-populates it with the provided results,
// and closes the channel to indicate completion.
//
// Parameters:
// - requestID: The request identifier for this ResultSet
// - results: A slice of Result objects to include in the ResultSet
//
// Returns:
// - ResultSet: A ResultSet containing all the provided results
//
// Example usage:
//
// var results []*Result
// // Collect results from custom transport
// for _, responseBytes := range responses {
// result, _ := DeserializeResult(responseBytes)
// results = append(results, result)
// }
// resultSet := NewResultSet("request-123", results)
// allResults, _ := resultSet.All()
func NewResultSet(requestID string, results []*Result) ResultSet {
Copy link
Contributor

@andreachild andreachild Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why you opted to create a new constuctor instead of changing an existing one to be public? Was there something lacking from the existing ones which made them not usable for your use cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing constructor signature is:

func newChannelResultSet(requestID string, container *synchronizedMap) ResultSet

The synchronizedMap parameter comes from connection.results and mainly serve for connection-based, concurrent request handling, which is not needed in my request-based use case.
Using newChannelResultSet would probably mean to make public addResult, synchronizedMap along with itself, and even something from connection.go, making everything complicated.
This new constructor enables Submitxxx to return a ResultSet from Response.ResponseResult.Data, which aligns with current interfaces

// Create a channel-based result set with capacity for all results
channelSize := len(results)
if channelSize == 0 {
channelSize = 1 // Ensure at least size 1
}
rs := newChannelResultSetCapacity(requestID, &synchronizedMap{make(map[string]ResultSet), sync.Mutex{}}, channelSize).(*channelResultSet)

// Add all results to the channel
for _, result := range results {
rs.channel <- result
}

// Close the channel to indicate no more results
rs.channelMutex.Lock()
rs.closed = true
close(rs.channel)
rs.channelMutex.Unlock()

return rs
}
Loading