14
14
import java .time .format .DateTimeFormatter ;
15
15
import java .time .temporal .ChronoUnit ;
16
16
import java .util .Queue ;
17
+ import java .util .concurrent .TimeUnit ;
17
18
18
19
public class PubtransConnector {
19
20
@@ -25,6 +26,7 @@ public class PubtransConnector {
25
26
private String queryString ;
26
27
private boolean enableCacheCheck ;
27
28
private int cacheMaxAgeInMins ;
29
+ private int queryTimeoutSecs ;
28
30
29
31
private PubtransTableHandler handler ;
30
32
private Jedis jedis ;
@@ -45,6 +47,7 @@ public static PubtransConnector newInstance(Connection connection,
45
47
connector .queryString = queryString (config );
46
48
connector .enableCacheCheck = config .getBoolean ("application.enableCacheTimestampCheck" );
47
49
connector .cacheMaxAgeInMins = config .getInt ("application.cacheMaxAgeInMinutes" );
50
+ connector .queryTimeoutSecs = (int )config .getDuration ("pubtrans.queryTimeout" , TimeUnit .SECONDS );
48
51
49
52
log .info ("Cache pre-condition enabled: " + connector .enableCacheCheck + " with max age " + connector .cacheMaxAgeInMins );
50
53
@@ -117,6 +120,7 @@ public void queryAndProcessResults() throws SQLException, PulsarClientException
117
120
try {
118
121
statement = connection .prepareStatement (queryString );
119
122
statement .setTimestamp (1 , new java .sql .Timestamp (handler .getLastModifiedTimeStamp ()));
123
+ statement .setQueryTimeout (queryTimeoutSecs );
120
124
resultSet = statement .executeQuery ();
121
125
produceMessages (handler .handleResultSet (resultSet ));
122
126
}
0 commit comments