Skip to content

Commit b7a8b65

Browse files
author
Ori Bernstein
committed
Add dedup support to python bindings.
1 parent 04d8fd3 commit b7a8b65

File tree

6 files changed

+448
-68
lines changed

6 files changed

+448
-68
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ grpc:
4646
python -m grpc_tools.protoc -Ibtrdb/grpcinterface --python_out=btrdb/grpcinterface --grpc_python_out=btrdb/grpcinterface btrdb/grpcinterface/btrdb.proto
4747
@echo
4848
@echo Fixing import statements:
49-
sed -i '' 's/btrdb_pb2 as btrdb__pb2/btrdb.grpcinterface.btrdb_pb2 as btrdb__pb2/' btrdb/grpcinterface/btrdb_pb2_grpc.py
49+
sed -i'.bak' 's/btrdb_pb2 as btrdb__pb2/btrdb.grpcinterface.btrdb_pb2 as btrdb__pb2/' btrdb/grpcinterface/btrdb_pb2_grpc.py
5050

5151

5252
# Build the universal wheel and source distribution

btrdb/endpoint.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,15 @@ def changes(self, uu, fromVersion, toVersion, resolution):
160160
BTrDBError.checkProtoStat(result.stat)
161161
yield result.ranges, result.versionMajor
162162

163-
def insert(self, uu, values):
163+
def insert(self, uu, values, policy):
164+
policy_map = {
165+
'never': btrdb_pb2.MergePolicy.NEVER,
166+
'equal': btrdb_pb2.MergePolicy.EQUAL,
167+
'retain': btrdb_pb2.MergePolicy.RETAIN,
168+
'replace': btrdb_pb2.MergePolicy.REPLACE,
169+
}
164170
protoValues = RawPoint.to_proto_list(values)
165-
params = btrdb_pb2.InsertParams(uuid = uu.bytes, sync = False, values = protoValues)
171+
params = btrdb_pb2.InsertParams(uuid = uu.bytes, sync = False, values = protoValues, merge_policy = policy_map[policy])
166172
result = self.stub.Insert(params)
167173
BTrDBError.checkProtoStat(result.stat)
168174
return result.versionMajor

btrdb/grpcinterface/btrdb.proto

+60
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ service BTrDB {
2323
rpc GetMetadataUsage(MetadataUsageParams) returns (MetadataUsageResponse);
2424
rpc GenerateCSV(GenerateCSVParams) returns (stream GenerateCSVResponse);
2525
rpc SQLQuery(SQLQueryParams) returns (stream SQLQueryResponse);
26+
//rpc SetCompactionConfig(SetCompactionConfigParams) returns (SetCompactionConfigResponse);
27+
//rpc GetCompactionConfig(GetCompactionConfigParams) returns (GetCompactionConfigResponse);
2628
}
2729

2830
message RawValuesParams {
@@ -167,9 +169,16 @@ message ChangesResponse {
167169
uint64 versionMinor = 3;
168170
repeated ChangedRange ranges = 4;
169171
}
172+
enum MergePolicy {
173+
NEVER = 0; // Never merge
174+
EQUAL = 1; // Merge identical (key, value) pairs
175+
RETAIN = 2; // When timestamps are equal, keep old value
176+
REPLACE = 3; // When timestamps are equal, keep new value
177+
}
170178
message InsertParams {
171179
bytes uuid = 1;
172180
bool sync = 2;
181+
MergePolicy merge_policy = 4;
173182
repeated RawPoint values = 3;
174183
}
175184
message InsertResponse {
@@ -315,3 +324,54 @@ message SQLQueryResponse {
315324
message Role {
316325
string name = 1;
317326
}
327+
328+
message SetCompactionConfigParams {
329+
//Which stream to configure
330+
bytes uuid = 1;
331+
332+
//Accessing versions LESS than this is not allowed
333+
uint64 CompactedVersion = 2;
334+
335+
//For every timestamp >= Start and < End in this list,
336+
//we cannot traverse the tree < Resolution.
337+
// Ranges cannot overlap, and a range can never have its resolution PW increased
338+
// (it is okay to decrease a range PW and it is okay to merge two adjacent ranges
339+
// if the new range has <= resolution)
340+
repeated ReducedResolutionRange reducedResolutionRanges = 3;
341+
342+
uint64 unused0 = 4;
343+
}
344+
345+
message SetCompactionConfigResponse {
346+
Status stat = 1;
347+
}
348+
349+
message GetCompactionConfigParams {
350+
//Which stream to query
351+
bytes uuid = 1;
352+
}
353+
354+
message GetCompactionConfigResponse {
355+
Status stat = 1;
356+
357+
//The latest version of the stream, as returned by StreamInfo
358+
uint64 LatestMajorVersion = 2;
359+
360+
//Accessing versions LESS than this is not allowed
361+
uint64 CompactedVersion = 3;
362+
363+
//For every timestamp >= Start and < End in this list,
364+
//we cannot traverse the tree < Resolution.
365+
// Ranges cannot overlap, and a range can never have its resolution PW increased
366+
// (it is okay to decrease a range PW and it is okay to merge two adjacent ranges
367+
// if the new range has <= resolution)
368+
repeated ReducedResolutionRange reducedResolutionRanges = 4;
369+
370+
uint64 unused0 = 5;
371+
}
372+
373+
message ReducedResolutionRange {
374+
int64 Start = 1;
375+
int64 End = 2;
376+
uint32 Resolution = 3;
377+
}

btrdb/grpcinterface/btrdb_pb2.py

+368-61
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

btrdb/grpcinterface/btrdb_pb2_grpc.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,9 @@ def GenerateCSV(self, request, context):
254254
raise NotImplementedError('Method not implemented!')
255255

256256
def SQLQuery(self, request, context):
257-
# missing associated documentation comment in .proto file
258-
pass
257+
"""rpc SetCompactionConfig(SetCompactionConfigParams) returns (SetCompactionConfigResponse);
258+
rpc GetCompactionConfig(GetCompactionConfigParams) returns (GetCompactionConfigResponse);
259+
"""
259260
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
260261
context.set_details('Method not implemented!')
261262
raise NotImplementedError('Method not implemented!')

btrdb/stream.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ def version(self):
402402
"""
403403
return self._btrdb.ep.streamInfo(self._uuid, True, False)[4]
404404

405-
def insert(self, data):
405+
def insert(self, data, merge='never'):
406406
"""
407407
Insert new data in the form (time, value) into the series.
408408
@@ -417,6 +417,12 @@ def insert(self, data):
417417
data: list[tuple[int, float]]
418418
A list of tuples in which each tuple contains a time (int) and
419419
value (float) for insertion to the database
420+
merge: str
421+
A string describing the merge policy. Valid policies are:
422+
- 'never': the default, no points are merged
423+
- 'equal': points are deduplicated if the time and value are equal
424+
- 'retain': if two points have the same timestamp, the old one is kept
425+
- 'replace': if two points have the same timestamp, the new one is kept
420426
421427
Returns
422428
-------
@@ -428,7 +434,7 @@ def insert(self, data):
428434
version = 0
429435
while i < len(data):
430436
thisBatch = data[i:i + INSERT_BATCH_SIZE]
431-
version = self._btrdb.ep.insert(self._uuid, thisBatch)
437+
version = self._btrdb.ep.insert(self._uuid, thisBatch, merge)
432438
i += INSERT_BATCH_SIZE
433439
return version
434440

0 commit comments

Comments
 (0)