Skip to content

Commit b394469

Browse files
committed
Merge pull request IBMStreams#4 from hildrum/master
Add fitler based on evalPredicate
2 parents b7c80d4 + fbc92cf commit b394469

File tree

19 files changed

+464
-2
lines changed

19 files changed

+464
-2
lines changed

.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*_cpp.pm
2+
*_h.pm
3+
toolkit.xml
4+
output
5+
data
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2015, International Business Machines Corporation
3+
* All Rights Reserved
4+
*******************************************************************************
5+
*/
6+
namespace com.ibm.streamsx.transform;
7+
8+
/**
9+
* Filter tuples based on a expression given as an rstring
10+
*/
11+
12+
public composite RuntimeFilter (input ToFilter, ChangeFilter; output Filtered) {
13+
param expression<rstring> $initialFilter: "";
14+
expression<rstring> $filterExpression;
15+
expression <boolean> $testFilterBeforeUpdate: false;
16+
graph
17+
18+
stream<ToFilter> Filtered = Custom(ToFilter;ChangeFilter) {
19+
logic state: {
20+
mutable rstring currentFilter = $initialFilter;
21+
boolean testFilterBeforeUpdate=$testFilterBeforeUpdate;
22+
}
23+
onTuple ChangeFilter: {
24+
mutable rstring _newfilter = $filterExpression;
25+
if (testFilterBeforeUpdate) {
26+
mutable int32 err=0;
27+
evalPredicate(_newfilter,(ToFilter){},err);
28+
if (err == 0) {
29+
currentFilter=_newfilter;
30+
if (isTraceable(Trace.info)) {
31+
appTrc(Trace.info,"Updating filter to "+_newfilter);
32+
}
33+
}
34+
else {
35+
appTrc(Trace.error,"Error "+(rstring)err+" on new filter "+_newfilter+"; not updating");
36+
}
37+
}
38+
else {
39+
if (isTraceable(Trace.info)) {
40+
appTrc(Trace.info,"Updating filter to "+_newfilter);
41+
}
42+
currentFilter = _newfilter;
43+
}
44+
}
45+
onTuple ToFilter: {
46+
mutable int32 error = 0;
47+
if (evalPredicate(currentFilter,ToFilter,error)) {
48+
submit(ToFilter,Filtered);
49+
}
50+
if (error != 0) {
51+
appTrc(Trace.error,"Error code "+(rstring)error+" evaluating "+currentFilter+" on "+(rstring)ToFilter);
52+
}
53+
}
54+
onPunct ToFilter: {
55+
submit(currentPunct(),Filtered);
56+
}
57+
}
58+
59+
}

com.ibm.streamsx.transform/info.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
<name>com.ibm.streamsx.transform</name>
1111
<description> This toolkit contains general-purpose operators for manipulating tuples.
1212
</description>
13-
<version>1.0.0</version>
14-
<requiredProductVersion>4.0.0.0</requiredProductVersion>
13+
<version>1.1.0</version>
14+
<requiredProductVersion>4.0.1.0</requiredProductVersion>
1515
</identity>
1616
<dependencies/>
1717
</toolkitInfoModel>

samples/RuntimeFilter/Basic/Main.spl

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2015, International Business Machines Corporation
3+
* All Rights Reserved
4+
*******************************************************************************/
5+
*/
6+
7+
use com.ibm.streamsx.transform::RuntimeFilter;
8+
composite Main {
9+
10+
graph
11+
12+
// Let’s generate some data.
13+
stream<int64 iter> Data = Beacon() {
14+
param
15+
period: 1.0; // new tuple every second
16+
output Data:
17+
iter = (int64)IterationCount();
18+
}
19+
20+
stream<rstring expr> FilterExpressions = FileSource() {
21+
param file: "filterFile.txt";
22+
hotFile: true;
23+
format: line;
24+
}
25+
26+
stream<Data> Filtered = RuntimeFilter(Data;FilterExpressions) {
27+
param filterExpression: expr;
28+
testFilterBeforeUpdate: true;
29+
}
30+
31+
() as sink = Custom(Filtered) {
32+
logic onTuple Filtered: {
33+
println((rstring)Filtered);
34+
}
35+
}
36+
}

samples/RuntimeFilter/Basic/Makefile

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
.PHONY: all clean
2+
3+
SPLC_FLAGS = -a -t ../../../com.ibm.streamsx.transform
4+
SPLC = $(STREAMS_INSTALL)/bin/sc
5+
6+
SPL_CMD_ARGS ?=
7+
SPL_MAIN_COMPOSITE = Main
8+
9+
all: standalone
10+
11+
standalone:
12+
$(SPLC) $(SPLC_FLAGS) -T -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)
13+
14+
15+
clean:
16+
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
17+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
# Licensed Materials - Property of IBM
3+
# Copyright IBM Corp. 2011, 2014
4+
# US Government Users Restricted Rights - Use, duplication or
5+
# disclosure restricted by GSA ADP Schedule Contract with
6+
# IBM Corp.
7+
*/
8+
use com.ibm.streamsx.transform::RuntimeFilter;
9+
10+
use com.ibm.streamsx.inet.rest::*;
11+
12+
/**
13+
* Web example showing RuntimeFilter in action.
14+
* Point browser to [http://127.0.0.1:8080]
15+
*/
16+
composite SimpleInject {
17+
18+
graph
19+
20+
stream<int64 iter> Data = Beacon() {
21+
param period: 1.0;
22+
output Data:
23+
iter = (int64)IterationCount();
24+
}
25+
26+
stream<rstring newfilter> FilterUpdate
27+
= HTTPTupleInjection() {
28+
param
29+
port: 8080;
30+
config
31+
// Ensure the operators are in a single PE to have a single web-server
32+
placement: partitionColocation("jetty8080");
33+
34+
}
35+
36+
stream<Data> Filtered = RuntimeFilter(Data;FilterUpdate) {
37+
param filterExpression: newfilter;
38+
}
39+
40+
41+
() as InjectView = HTTPTupleView(Filtered) {
42+
window Filtered: sliding, count(10), count(1);
43+
param
44+
port: 8080;
45+
config
46+
// Ensure the operators are in a single PE to have a single web-server
47+
placement: partitionColocation("jetty8080");
48+
49+
}
50+
51+
}
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<info:toolkitInfoModel xmlns:common="http://www.ibm.com/xmlns/prod/streams/spl/common" xmlns:info="http://www.ibm.com/xmlns/prod/streams/spl/toolkitInfo">
3+
<info:identity>
4+
<info:name>RuntimeFilterWithHTTPTupleInject</info:name>
5+
<info:description>Sample application showing RuntimeFilter with HTTPTupleInject to add filtersand other operator</info:description>
6+
<info:version>1.0.0</info:version>
7+
<info:requiredProductVersion>4.0.1</info:requiredProductVersion>
8+
</info:identity>
9+
<info:dependencies>
10+
<info:toolkit>
11+
<common:name>com.ibm.streamsx.inet</common:name>
12+
<common:version>2.5</common:version>
13+
</info:toolkit>
14+
</info:dependencies>
15+
</info:toolkitInfoModel>
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2015, International Business Machines Corporation
3+
* All Rights Reserved
4+
*******************************************************************************/
5+
type StringListData = uint64 tupleNum, list<rstring> characterList;
6+
type ThreeClauseString = rstring hero,rstring heroine,rstring villain;
7+
type IntData = int64 x, int64 y, int64 z;
8+
9+
composite GenerateStringList(output OutStream) {
10+
param
11+
expression<int32> $iterations: (int32)getSubmissionTimeValue("iterations","100000000");
12+
graph
13+
stream<StringListData> OutStream = Beacon() {
14+
param iterations: $iterations;
15+
output OutStream:
16+
tupleNum=IterationCount(),
17+
characterList = ["Emma","Lizzy","Jane","Wentworth","Elinor","Fanny","Anne","Lady Susan","Catherine","Henry"];
18+
}
19+
}
20+
21+
composite GenerateIntData(output Data) {
22+
param
23+
expression<int32> $iterations: (int32)getSubmissionTimeValue("iterations","100000000");
24+
graph
25+
stream<int64 x,int64 y, int64 z> Data = Beacon() {
26+
param iterations: $iterations;
27+
output Data:
28+
x = (int64)IterationCount()%10l,
29+
y = (int64)IterationCount()%11l,
30+
z = (int64)IterationCount()%13l;
31+
}
32+
}
33+
34+
composite GenerateStringData(output Data) {
35+
param
36+
expression<list<rstring>> $villains: ["Miss Bingley","Voldemort"];
37+
expression<int32> $iterations: (int32)getSubmissionTimeValue("iterations","100000000");
38+
graph
39+
stream<ThreeClauseString> Data = Beacon() {
40+
param iterations: $iterations;
41+
output Data:
42+
hero = "Darcy",
43+
heroine = "Lizzy",
44+
villain = $villains[(int32)(IterationCount()%2ul)];
45+
}
46+
}
47+
48+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2015, International Business Machines Corporation
3+
* All Rights Reserved
4+
*******************************************************************************/
5+
6+
use com.ibm.streamsx.transform::RuntimeFilter;
7+
8+
composite GenerateIntTuples(output Data) {
9+
graph
10+
stream<int64 x,int64 y, int64 z> Data = Beacon() {
11+
param iterations: 100000000;
12+
initDelay: 1.0;
13+
output Data:
14+
x = (int64)IterationCount()%10l,
15+
y = (int64)IterationCount()%11l,
16+
z = (int64)IterationCount()%13l;
17+
}
18+
}
19+
20+
composite FlexibleFilterInt {
21+
22+
graph
23+
stream<int64 x,int64 y, int64 z> Data = GenerateIntTuples() {
24+
25+
}
26+
27+
stream<rstring filterExpr> Filters = Beacon() {
28+
logic state:
29+
list<rstring> expressions = ["x > 10 || y > 15 || z > 2"];
30+
param iterations: 1;
31+
//period: 5.0;
32+
output Filters:
33+
filterExpr=expressions[IterationCount()];
34+
}
35+
36+
37+
stream<Data> FilteredData = RuntimeFilter(Data;Filters) {
38+
param filterExpression: Filters.filterExpr;
39+
}
40+
41+
}
42+
43+
composite BasicFilterInt {
44+
graph
45+
stream<int64 x,int64 y, int64 z> Data = GenerateIntTuples() {
46+
47+
}
48+
49+
stream<Data> FilteredData = Filter(Data) {
50+
param filter: x > 10l || y > 15l || z > 2l;
51+
}
52+
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2015, International Business Machines Corporation
3+
* All Rights Reserved
4+
*******************************************************************************/
5+
6+
<%
7+
my $label = $ARGV[0];
8+
my $filterExpression = $ARGV[1];
9+
$filterExpression=~ s/\"/\\\"/g;
10+
%>
11+
12+
13+
composite IntDynamic_<%=$label%> {
14+
graph
15+
() as dyn = DynamicFilterTest() {
16+
param
17+
GenData: GenerateIntData;
18+
filter: "<%=$filterExpression%>";
19+
DataType: IntData;
20+
}
21+
22+
}
23+
24+
composite IntBasic_<%=$label%> {
25+
graph
26+
() as basic = BasicFilterTest() {
27+
param
28+
GenData: GenerateIntData;
29+
DataType:IntData;
30+
}
31+
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*******************************************************************************
2+
* Copyright (C) 2015, International Business Machines Corporation
3+
* All Rights Reserved
4+
*******************************************************************************/
5+
6+
<%
7+
my $label = $ARGV[0];
8+
my $filterExpression = $ARGV[1];
9+
my $escaped = $filterExpression;
10+
$escaped =~ s/\"/\\\"/g;
11+
%>
12+
13+
14+
composite StringEqualityDynamic_<%=$label%> {
15+
graph
16+
() as dyn = DynamicFilterTest() {
17+
param
18+
GenData: GenerateStringData;
19+
filter: "<%=$escaped%>";
20+
DataType: ThreeClauseString;
21+
}
22+
23+
}
24+
25+
composite StringEqualityBasic_<%=$label%> {
26+
graph
27+
() as basic = BasicFilterTest() {
28+
param
29+
GenData: GenerateStringData;
30+
DataType:ThreeClauseString;
31+
}
32+
33+
}

0 commit comments

Comments
 (0)