Skip to content

Commit 9065ac1

Browse files
author
KIRSTEN W. HILDRUM
committed
New composite filter operator build on evalPredicate
1 parent 5906163 commit 9065ac1

File tree

16 files changed

+364
-0
lines changed

16 files changed

+364
-0
lines changed

.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*_cpp.pm
2+
*_h.pm
3+
toolkit.xml
4+
output
5+
data
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
namespace com.ibm.streamsx.transform;
2+
3+
/**
4+
* Filter tuples based on a expression given as an rstring
5+
*/
6+
7+
public composite RuntimeFilter (input ToFilter, ChangeFilter; output Filtered) {
8+
param expression<rstring> $initialFilter: "";
9+
expression<rstring> $filterExpression;
10+
expression <boolean> $testFilterBeforeUpdate: false;
11+
graph
12+
13+
stream<ToFilter> Filtered = Custom(ToFilter;ChangeFilter) {
14+
logic state: {
15+
mutable rstring currentFilter = $initialFilter;
16+
boolean testFilterBeforeUpdate=$testFilterBeforeUpdate;
17+
}
18+
onTuple ChangeFilter: {
19+
rstring newfilter = $filterExpression;
20+
if (testFilterBeforeUpdate) {
21+
mutable int32 err=0;
22+
evalPredicate(newfilter,(ToFilter){},err);
23+
if (err == 0) {
24+
currentFilter=newfilter;
25+
if (isTraceable(Trace.info)) {
26+
appTrc(Trace.info,"Updating filter to "+newfilter);
27+
}
28+
}
29+
else {
30+
appTrc(Trace.error,"Error "+(rstring)err+" on new filter "+newfilter+"; not updating");
31+
}
32+
}
33+
else {
34+
if (isTraceable(Trace.info)) {
35+
appTrc(Trace.info,"Updating filter to "+newfilter);
36+
}
37+
currentFilter = newfilter;
38+
}
39+
}
40+
onTuple ToFilter: {
41+
mutable int32 error = 0;
42+
if (evalPredicate(currentFilter,ToFilter,error)) {
43+
submit(ToFilter,Filtered);
44+
}
45+
if (error != 0) {
46+
appTrc(Trace.error,"Error code "+(rstring)error+" evaluating "+currentFilter+" on "+(rstring)ToFilter);
47+
}
48+
}
49+
onPunct ToFilter: {
50+
submit(currentPunct(),Filtered);
51+
}
52+
}
53+
54+
}

samples/RuntimeFilter/Basic/Main.spl

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use com.ibm.streamsx.transform::RuntimeFilter;
2+
composite Main {
3+
4+
graph
5+
6+
// Let’s generate some data.
7+
stream<int64 iter> Data = Beacon() {
8+
param
9+
period: 1.0; // new tuple every second
10+
output Data:
11+
iter = (int64)IterationCount();
12+
}
13+
14+
stream<rstring expr> FilterExpressions = FileSource() {
15+
param file: "filterFile.txt";
16+
hotFile: true;
17+
format: line;
18+
}
19+
20+
stream<Data> Filtered = RuntimeFilter(Data;FilterExpressions) {
21+
param filterExpression: expr;
22+
testFilterBeforeUpdate: true;
23+
}
24+
25+
() as sink = Custom(Filtered) {
26+
logic onTuple Filtered: {
27+
println((rstring)Filtered);
28+
}
29+
}
30+
}

samples/RuntimeFilter/Basic/Makefile

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
.PHONY: all clean
2+
3+
SPLC_FLAGS = -a -t ../../../com.ibm.streamsx.transform
4+
SPLC = $(STREAMS_INSTALL)/bin/sc
5+
6+
SPL_CMD_ARGS ?=
7+
SPL_MAIN_COMPOSITE = Main
8+
9+
all: standalone
10+
11+
standalone:
12+
$(SPLC) $(SPLC_FLAGS) -T -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)
13+
14+
15+
clean:
16+
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
17+
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
2+
type StringListData = uint64 tupleNum, list<rstring> potterList, list<rstring> lotrList,list<rstring> austenList;
3+
type ThreeClauseString = rstring hero,rstring heroine,rstring villain;
4+
type IntData = int64 x, int64 y, int64 z;
5+
6+
composite GenerateStringList(output OutStream) {
7+
param
8+
expression<list<rstring>> $nameList: ["Harry","Herminone","Ron","Mrs. Weasley","Sirius","Dumbledore","Lilly","Voldemort","James","Malfoy"];
9+
expression<list<rstring>> $nameList2: ["Aragorn","Frodo","Gandalf","Eowyn","Merry","Fatty","Pippin","Eomer","Theoden","Faramir","Boromir"];
10+
expression<int32> $iterations: (int32)getSubmissionTimeValue("iterations","100000000");
11+
graph
12+
stream<StringListData> OutStream = Beacon() {
13+
param iterations: $iterations;
14+
output OutStream:
15+
tupleNum=IterationCount(),
16+
potterList = $nameList,
17+
lotrList = $nameList2,
18+
austenList = ["Emma","Lizzy","Jane","Wentworth","Elinor","Fanny","Anne","Lady Susan","Catherine","Henry"];
19+
}
20+
}
21+
22+
composite GenerateIntData(output Data) {
23+
param
24+
expression<int32> $iterations: (int32)getSubmissionTimeValue("iterations","100000000");
25+
graph
26+
stream<int64 x,int64 y, int64 z> Data = Beacon() {
27+
param iterations: $iterations;
28+
output Data:
29+
x = (int64)IterationCount()%10l,
30+
y = (int64)IterationCount()%11l,
31+
z = (int64)IterationCount()%13l;
32+
}
33+
}
34+
35+
composite GenerateStringData(output Data) {
36+
param
37+
expression<list<rstring>> $villains: ["Miss Bingley","Voldemort"];
38+
expression<int32> $iterations: (int32)getSubmissionTimeValue("iterations","100000000");
39+
graph
40+
stream<ThreeClauseString> Data = Beacon() {
41+
param iterations: $iterations;
42+
output Data:
43+
hero = "Darcy",
44+
heroine = "Lizzy",
45+
villain = $villains[(int32)(IterationCount()%2ul)];
46+
}
47+
}
48+
49+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use com.ibm.streamsx.transform::RuntimeFilter;
2+
3+
composite GenerateIntTuples(output Data) {
4+
graph
5+
stream<int64 x,int64 y, int64 z> Data = Beacon() {
6+
param iterations: 100000000;
7+
initDelay: 1.0;
8+
output Data:
9+
x = (int64)IterationCount()%10l,
10+
y = (int64)IterationCount()%11l,
11+
z = (int64)IterationCount()%13l;
12+
}
13+
}
14+
15+
composite FlexibleFilterInt {
16+
17+
graph
18+
stream<int64 x,int64 y, int64 z> Data = GenerateIntTuples() {
19+
20+
}
21+
22+
stream<rstring filterExpr> Filters = Beacon() {
23+
logic state:
24+
list<rstring> expressions = ["x > 10 || y > 15 || z > 2"];
25+
param iterations: 1;
26+
//period: 5.0;
27+
output Filters:
28+
filterExpr=expressions[IterationCount()];
29+
}
30+
31+
32+
stream<Data> FilteredData = RuntimeFilter(Data;Filters) {
33+
param filterExpression: Filters.filterExpr;
34+
}
35+
36+
}
37+
38+
composite BasicFilterInt {
39+
graph
40+
stream<int64 x,int64 y, int64 z> Data = GenerateIntTuples() {
41+
42+
}
43+
44+
stream<Data> FilteredData = Filter(Data) {
45+
param filter: x > 10l || y > 15l || z > 2l;
46+
}
47+
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
<%
3+
my $label = $ARGV[0];
4+
my $filterExpression = $ARGV[1];
5+
$filterExpression=~ s/\"/\\\"/g;
6+
%>
7+
8+
9+
composite IntDynamic_<%=$label%> {
10+
graph
11+
() as dyn = DynamicFilterTest() {
12+
param
13+
GenData: GenerateIntData;
14+
filter: "<%=$filterExpression%>";
15+
DataType: IntData;
16+
}
17+
18+
}
19+
20+
composite IntBasic_<%=$label%> {
21+
graph
22+
() as basic = BasicFilterTest() {
23+
param
24+
GenData: GenerateIntData;
25+
DataType:IntData;
26+
}
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
<%
3+
my $label = $ARGV[0];
4+
my $filterExpression = $ARGV[1];
5+
my $escaped = $filterExpression;
6+
$escaped =~ s/\"/\\\"/g;
7+
%>
8+
9+
10+
composite StringEqualityDynamic_<%=$label%> {
11+
graph
12+
() as dyn = DynamicFilterTest() {
13+
param
14+
GenData: GenerateStringData;
15+
filter: "<%=$escaped%>";
16+
DataType: ThreeClauseString;
17+
}
18+
19+
}
20+
21+
composite StringEqualityBasic_<%=$label%> {
22+
graph
23+
() as basic = BasicFilterTest() {
24+
param
25+
GenData: GenerateStringData;
26+
DataType:ThreeClauseString;
27+
}
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
2+
<%
3+
my $label = $ARGV[0];
4+
my $filterExpression = $ARGV[1];
5+
6+
my $escapedFilter = $filterExpression;
7+
$escapedFilter =~ s/"/\\\"/g;
8+
%>
9+
10+
11+
composite StringListDynamic_<%=$label%> {
12+
graph
13+
() as dyn = DynamicFilterTest() {
14+
param
15+
GenData: GenerateStringList;
16+
filter: "<%=$escapedFilter%>";
17+
DataType: StringListData;
18+
}
19+
20+
}
21+
22+
composite StringListBasic_<%=$label%> {
23+
graph
24+
() as basic = BasicFilterTest() {
25+
param
26+
GenData: GenerateStringList;
27+
DataType:StringListData;
28+
}
29+
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<%
2+
my $filterExpression=$ARGV[1];
3+
%>
4+
use com.ibm.streamsx.transform::RuntimeFilter;
5+
6+
composite DynamicFilterTest {
7+
param type $DataType;
8+
operator $GenData;
9+
expression<rstring> $filter;
10+
graph
11+
12+
stream<$DataType> Data = $GenData() {
13+
}
14+
15+
stream<rstring filterexpression> Control = Custom() {
16+
// We don't really want to do anything.
17+
}
18+
19+
stream<Data> Filtered = RuntimeFilter(Data;Control) {
20+
param initialFilter: $filter;
21+
filterExpression: Control.filterexpression;
22+
}
23+
24+
/*
25+
() as sink = FileSink(Filtered) {
26+
param file: "flexible.out";
27+
}
28+
*/
29+
30+
}
31+
32+
composite BasicFilterTest {
33+
param type $DataType;
34+
operator $GenData;
35+
graph
36+
37+
stream<$DataType> Data = $GenData() {
38+
}
39+
40+
stream<Data> Filtered = Filter(Data) {
41+
param filter: <%=$filterExpression%>;
42+
}
43+
/*
44+
() as sink = FileSink(Filtered) {
45+
param file:"basic.out";
46+
}
47+
*/
48+
}
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
toolkit=../../../com.ibm.streamsx.transform
2+
opts=-a
3+
iterations=60000000
4+
sc $opts -T -M StringListBasic_String1 String1 '\"malfoy\" in potterList' -t $toolkit
5+
time output/bin/standalone iterations=$iterations
6+
sc $opts -T -M StringListDynamic_String1 String1 '\"malfoy\" in potterList' -t $toolkit
7+
time output/bin/standalone iterations=$iterations
8+
sc $opts -T -M IntDynamic_1 1 'x > 5l' -t $toolkit
9+
time output/bin/standalone iterations=$iterations
10+
sc $opts -T -M IntBasic_1 1 'x > 5l' -t $toolkit
11+
time output/bin/standalone iterations=$iterations
12+
sc $opts -T -M StringEqualityBasic_1 1 'heroine==\"Lizzy\"' -t $toolkit
13+
time output/bin/standalone iterations=$iterations
14+
sc $opts -T -M StringEqualityDynamic_1 1 'heroine==\"Lizzy\"' -t $toolkit
15+
time output/bin/standalone iterations=$iterations

tests/RuntimeFilter/timing/info.xml

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<info:toolkitInfoModel xmlns:common="http://www.ibm.com/xmlns/prod/streams/spl/common"
3+
xmlns:info="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo">
4+
<info:identity>
5+
<info:name>runtimefilter.test</info:name>
6+
<info:description>Performance comparison tools for RuntimeFilter</info:description>
7+
<info:version>1.0.0</info:version>
8+
<info:requiredProductVersion>4.0.1</info:requiredProductVersion>
9+
</info:identity>
10+
<info:dependencies/>
11+
</info:toolkitInfoModel>

0 commit comments

Comments
 (0)