Skip to content

Commit ce5a571

Browse files
Fix issues raised in review
- Check whether batching is supported when starting a batch. - Better keying of batched writes. Signed-off-by: Mark S. Lewis <[email protected]>
1 parent 63ae89a commit ce5a571

File tree

3 files changed

+35
-29
lines changed

3 files changed

+35
-29
lines changed

shim/batch.go

+23-12
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,19 @@ import (
77
"github.com/hyperledger/fabric-protos-go-apiv2/peer"
88
)
99

10+
type batchKey struct {
11+
Collection string
12+
Key string
13+
Type peer.WriteRecord_Type
14+
}
15+
1016
type writeBatch struct {
11-
writes map[string]*peer.WriteRecord
17+
writes map[batchKey]*peer.WriteRecord
1218
}
1319

1420
func newWriteBatch() *writeBatch {
1521
return &writeBatch{
16-
writes: make(map[string]*peer.WriteRecord),
22+
writes: make(map[batchKey]*peer.WriteRecord),
1723
}
1824
}
1925

@@ -31,39 +37,44 @@ func (b *writeBatch) Writes() []*peer.WriteRecord {
3137
}
3238

3339
func (b *writeBatch) PutState(collection string, key string, value []byte) {
34-
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
40+
b.write(&peer.WriteRecord{
3541
Key: key,
3642
Value: value,
3743
Collection: collection,
3844
Type: peer.WriteRecord_PUT_STATE,
39-
}
45+
})
4046
}
4147

4248
func (b *writeBatch) PutStateMetadataEntry(collection string, key string, metakey string, metadata []byte) {
43-
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
49+
b.write(&peer.WriteRecord{
4450
Key: key,
4551
Collection: collection,
4652
Metadata: &peer.StateMetadata{Metakey: metakey, Value: metadata},
4753
Type: peer.WriteRecord_PUT_STATE_METADATA,
48-
}
54+
})
4955
}
5056

5157
func (b *writeBatch) DelState(collection string, key string) {
52-
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
58+
b.write(&peer.WriteRecord{
5359
Key: key,
5460
Collection: collection,
5561
Type: peer.WriteRecord_DEL_STATE,
56-
}
62+
})
5763
}
5864

5965
func (b *writeBatch) PurgeState(collection string, key string) {
60-
b.writes[batchLedgerKey(collection, key)] = &peer.WriteRecord{
66+
b.write(&peer.WriteRecord{
6167
Key: key,
6268
Collection: collection,
6369
Type: peer.WriteRecord_PURGE_PRIVATE_DATA,
64-
}
70+
})
6571
}
6672

67-
func batchLedgerKey(collection string, key string) string {
68-
return prefixStateDataWriteBatch + collection + key
73+
func (b *writeBatch) write(record *peer.WriteRecord) {
74+
key := batchKey{
75+
Collection: record.Collection,
76+
Key: record.Key,
77+
Type: record.Type,
78+
}
79+
b.writes[key] = record
6980
}

shim/handler.go

+11-16
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ const (
1919
established state = "established" // connection established
2020
ready state = "ready" // ready for requests
2121

22-
defaultMaxSizeWriteBatch = 100
23-
prefixMetaDataWriteBatch = "m"
24-
prefixStateDataWriteBatch = "s"
22+
defaultMaxSizeWriteBatch = 100
2523
)
2624

2725
// PeerChaincodeStream is the common stream interface for Peer - chaincode communication.
@@ -426,7 +424,11 @@ func (h *Handler) handlePurgeState(collection string, key string, channelID stri
426424
}
427425

428426
// handleWriteBatch communicates with the peer to write batch to state all changes information into the ledger.
429-
func (h *Handler) handleWriteBatch(batch *peer.WriteBatchState, channelID string, txid string) error {
427+
func (h *Handler) handleWriteBatch(writes []*peer.WriteRecord, channelID string, txid string) error {
428+
batch := &peer.WriteBatchState{
429+
Rec: writes,
430+
}
431+
430432
// Construct payload for PUT_STATE_BATCH
431433
payloadBytes := marshalOrPanic(batch)
432434

@@ -453,21 +455,14 @@ func (h *Handler) handleWriteBatch(batch *peer.WriteBatchState, channelID string
453455
}
454456

455457
func (h *Handler) sendBatch(channelID string, txid string, writes []*peer.WriteRecord) error {
456-
batch := &peer.WriteBatchState{}
457-
for _, kv := range writes {
458-
batch.Rec = append(batch.Rec, kv)
459-
if len(batch.Rec) >= int(h.maxSizeWriteBatch) {
460-
err := h.handleWriteBatch(batch, channelID, txid)
461-
if err != nil {
462-
return fmt.Errorf("failed send batch: %s", err)
463-
}
464-
batch.Rec = batch.Rec[:0]
458+
for ; len(writes) > int(h.maxSizeWriteBatch); writes = writes[h.maxSizeWriteBatch:] {
459+
if err := h.handleWriteBatch(writes[:h.maxSizeWriteBatch], channelID, txid); err != nil {
460+
return fmt.Errorf("failed send batch: %s", err)
465461
}
466462
}
467463

468-
if len(batch.Rec) != 0 {
469-
err := h.handleWriteBatch(batch, channelID, txid)
470-
if err != nil {
464+
if len(writes) > 0 {
465+
if err := h.handleWriteBatch(writes, channelID, txid); err != nil {
471466
return fmt.Errorf("failed send batch: %s", err)
472467
}
473468
}

shim/stub.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ func (s *ChaincodeStub) GetQueryResultWithPagination(query string, pageSize int3
623623

624624
// StartWriteBatch documentation can be found in interfaces.go
625625
func (s *ChaincodeStub) StartWriteBatch() {
626-
if s.writeBatch == nil {
626+
if s.handler.usePeerWriteBatch && s.writeBatch == nil {
627627
s.writeBatch = newWriteBatch()
628628
}
629629
}

0 commit comments

Comments
 (0)