Skip to content

Commit 3c7f877

Browse files
committed
added streaming token refresh config
1 parent 0b28f41 commit 3c7f877

File tree

11 files changed

+64
-21
lines changed

11 files changed

+64
-21
lines changed

‎client/pom.xml‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.18.1-rc2</version>
8+
<version>4.18.1-rc3</version>
99
</parent>
10-
<version>4.18.1-rc2</version>
10+
<version>4.18.1-rc3</version>
1111
<artifactId>java-client</artifactId>
1212
<packaging>jar</packaging>
1313
<name>Java Client</name>

‎client/src/main/java/io/split/client/SplitClientConfig.java‎

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ private HttpScheme(){
112112
privatefinalCustomHeaderDecorator_customHeaderDecorator;
113113
privatefinalCustomHttpModule_alternativeHTTPModule;
114114

115+
privatefinalint_streamingTokenRefreshRate;
116+
115117
publicstaticBuilderbuilder(){
116118
returnnewBuilder();
117119
}
@@ -170,7 +172,8 @@ private SplitClientConfig(String endpoint,
170172
intinvalidSets,
171173
CustomHeaderDecoratorcustomHeaderDecorator,
172174
CustomHttpModulealternativeHTTPModule,
173-
FallbackTreatmentsConfigurationfallbackTreatments){
175+
FallbackTreatmentsConfigurationfallbackTreatments,
176+
intstreamingTokenRefreshRate){
174177
_endpoint = endpoint;
175178
_eventsEndpoint = eventsEndpoint;
176179
_featuresRefreshRate = pollForFeatureChangesEveryNSeconds;
@@ -226,6 +229,7 @@ private SplitClientConfig(String endpoint,
226229
_customHeaderDecorator = customHeaderDecorator;
227230
_alternativeHTTPModule = alternativeHTTPModule;
228231
_fallbackTreatments = fallbackTreatments;
232+
_streamingTokenRefreshRate = streamingTokenRefreshRate;
229233

230234
Propertiesprops = newProperties();
231235
try{
@@ -446,6 +450,8 @@ public boolean isSdkEndpointOverridden(){
446450

447451
publicFallbackTreatmentsConfigurationfallbackTreatments(){return_fallbackTreatments}
448452

453+
publicintstreamingTokenRefreshRate(){return_streamingTokenRefreshRate}
454+
449455
publicstaticfinalclassBuilder{
450456
privateString_endpoint = SDK_ENDPOINT;
451457
privateboolean_endpointSet = false;
@@ -505,6 +511,7 @@ public static final class Builder{
505511
privateCustomHeaderDecorator_customHeaderDecorator = null;
506512
privateCustomHttpModule_alternativeHTTPModule = null;
507513
privateFallbackTreatmentsConfiguration_fallbackTreatments;
514+
privateint_streamingTokenRefreshRate = 180;
508515

509516
publicBuilder(){
510517
}
@@ -1055,6 +1062,11 @@ public Builder threadFactory(ThreadFactory threadFactory){
10551062
returnthis;
10561063
}
10571064

1065+
publicBuilderstreamingTokenRefreshRate(intstreamingTokenRefreshRate){
1066+
_streamingTokenRefreshRate = streamingTokenRefreshRate;
1067+
returnthis;
1068+
}
1069+
10581070
privatevoidverifyRates(){
10591071
if (_featuresRefreshRate < 5 ){
10601072
thrownewIllegalArgumentException("featuresRefreshRate must be >= 5: " + _featuresRefreshRate);
@@ -1071,9 +1083,14 @@ private void verifyRates(){
10711083
if (_metricsRefreshRate < 30){
10721084
thrownewIllegalArgumentException("metricsRefreshRate must be >= 30: " + _metricsRefreshRate);
10731085
}
1086+
10741087
if(_telemetryRefreshRate < 60){
10751088
thrownewIllegalStateException("_telemetryRefreshRate must be >= 60");
10761089
}
1090+
1091+
if (_streamingTokenRefreshRate < 60){
1092+
thrownewIllegalStateException("_streamingTokenRefreshRate must be >= 60");
1093+
}
10771094
}
10781095

10791096
privatevoidverifyEndPoints(){
@@ -1274,7 +1291,8 @@ public SplitClientConfig build(){
12741291
_invalidSetsCount,
12751292
_customHeaderDecorator,
12761293
_alternativeHTTPModule,
1277-
_fallbackTreatments);
1294+
_fallbackTreatments,
1295+
_streamingTokenRefreshRate);
12781296
}
12791297
}
12801298
}

‎client/src/main/java/io/split/engine/common/PushManagerImp.java‎

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class PushManagerImp implements PushManager{
5252
privatefinalScheduledExecutorService_scheduledExecutorService;
5353
privateAtomicLong_expirationTime;
5454
privatefinalTelemetryRuntimeProducer_telemetryRuntimeProducer;
55+
privatefinalint_streamingTokenRefreshRate;
5556

5657
@VisibleForTesting
5758
/* package private */PushManagerImp(AuthApiClientauthApiClient,
@@ -60,7 +61,8 @@ public class PushManagerImp implements PushManager{
6061
Worker<SegmentQueueDto> segmentWorker,
6162
PushStatusTrackerpushStatusTracker,
6263
TelemetryRuntimeProducertelemetryRuntimeProducer,
63-
ThreadFactorythreadFactory){
64+
ThreadFactorythreadFactory,
65+
intstreamingTokenRefreshRate){
6466

6567
_authApiClient = checkNotNull(authApiClient);
6668
_eventSourceClient = checkNotNull(eventSourceClient);
@@ -70,6 +72,7 @@ public class PushManagerImp implements PushManager{
7072
_expirationTime = newAtomicLong();
7173
_scheduledExecutorService = buildSingleThreadScheduledExecutor(threadFactory, "Split-SSERefreshToken-%d");
7274
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
75+
_streamingTokenRefreshRate = streamingTokenRefreshRate;
7376
}
7477

7578
publicstaticPushManagerImpbuild(Synchronizersynchronizer,
@@ -83,7 +86,8 @@ public static PushManagerImp build(Synchronizer synchronizer,
8386
SplitCacheProducersplitCacheProducer,
8487
FlagSetsFilterflagSetsFilter,
8588
RuleBasedSegmentCacheruleBasedSegmentCache,
86-
RuleBasedSegmentParserruleBasedSegmentParser){
89+
RuleBasedSegmentParserruleBasedSegmentParser,
90+
intstreamingTokenRefreshRate){
8791
FeatureFlagsWorkerfeatureFlagsWorker = newFeatureFlagWorkerImp(synchronizer, splitParser, ruleBasedSegmentParser, splitCacheProducer,
8892
ruleBasedSegmentCache, telemetryRuntimeProducer, flagSetsFilter);
8993
Worker<SegmentQueueDto> segmentWorker = newSegmentsWorkerImp(synchronizer);
@@ -96,7 +100,8 @@ public static PushManagerImp build(Synchronizer synchronizer,
96100
segmentWorker,
97101
pushStatusTracker,
98102
telemetryRuntimeProducer,
99-
threadFactory);
103+
threadFactory,
104+
streamingTokenRefreshRate);
100105
}
101106

102107
@Override
@@ -106,18 +111,22 @@ public void start(){
106111
AuthenticationResponseresponse = _authApiClient.Authenticate();
107112
_log.debug(String.format("Auth service response pushEnabled: %s", response.isPushEnabled()));
108113
if (response.isPushEnabled() && startSse(response.getToken(), response.getChannels())){
109-
_expirationTime.set(response.getExpiration());
114+
_expirationTime.set(_streamingTokenRefreshRate);
110115
_telemetryRuntimeProducer.recordStreamingEvents(newStreamingEvent(StreamEventsEnum.TOKEN_REFRESH.getType(),
111116
response.getExpiration(), System.currentTimeMillis()));
112117
return;
113118
}
114119

115120
cleanUpResources();
116121
if (response.isRetry()){
122+
_log.debug(String.format("Handling retry error response"));
117123
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
118124
} else{
125+
_log.debug(String.format("Auth service response is disabled: %s", response.getToken()));
119126
_pushStatusTracker.forcePushDisable();
120127
}
128+
} catch (Exceptione){
129+
_log.debug("Exception in PushManager start: " + e.getMessage());
121130
} finally{
122131
lock.unlock();
123132
}
@@ -156,14 +165,22 @@ private boolean startSse(String token, String channels){
156165

157166
@Override
158167
publicvoidstartWorkers(){
159-
_featureFlagsWorker.start();
160-
_segmentWorker.start();
168+
try{
169+
_featureFlagsWorker.start();
170+
_segmentWorker.start();
171+
} catch (Exceptione){
172+
_log.debug("Exception in starting workers: " + e.getMessage());
173+
}
161174
}
162175

163176
@Override
164177
publicvoidstopWorkers(){
165-
_featureFlagsWorker.stop();
166-
_segmentWorker.stop();
178+
try{
179+
_featureFlagsWorker.stop();
180+
_segmentWorker.stop();
181+
} catch (Exceptione){
182+
_log.debug("Exception in stopping workers: " + e.getMessage());
183+
}
167184
}
168185

169186
privatevoidcleanUpResources(){

‎client/src/main/java/io/split/engine/common/SyncManagerImp.java‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public static SyncManagerImp build(SplitTasks splitTasks,
116116
splitCacheProducer,
117117
flagSetsFilter,
118118
ruleBasedSegmentCache,
119-
ruleBasedSegmentParser);
119+
ruleBasedSegmentParser,
120+
config.streamingTokenRefreshRate());
120121

121122
returnnewSyncManagerImp(splitTasks,
122123
config.streamingEnabled(),

‎client/src/main/java/io/split/engine/sse/client/SSEClient.java‎

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public boolean isOpen(){
115115
}
116116

117117
publicvoidclose(){
118+
_log.debug("closing SSE client");
118119
try{
119120
lock.lock();
120121
_forcedStop.set(true);
@@ -128,6 +129,8 @@ public void close(){
128129
}
129130
}
130131
}
132+
} catch (Exceptione){
133+
_log.debug("Exception in closing SSE client: " + e.getMessage());
131134
} finally{
132135
lock.unlock();
133136
}
@@ -184,16 +187,19 @@ private void connectAndLoop(URI uri, CountDownLatch signal){
184187
}
185188
}
186189
} catch (Exceptione){// Any other error non related to the connection disables streaming altogether
190+
_log.debug(String.format("SSE connection exception: %s", e.getMessage()));
187191
_telemetryRuntimeProducer
188192
.recordStreamingEvents(newStreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(),
189193
StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(),
190194
System.currentTimeMillis()));
191195
_log.warn(e.getMessage(), e);
192196
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
193197
} finally{
198+
_log.debug(String.format("Attempt to close SSE connection"));
194199
try{
195200
_ongoingResponse.get().close();
196201
} catch (IOExceptione){
202+
_log.debug(String.format("SSE connection closing exception: %s", e.getMessage()));
197203
_log.debug(e.getMessage());
198204
}
199205

‎client/src/test/java/io/split/engine/common/PushManagerTest.java‎

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void setUp(){
4040
_segmentsWorkerImp,
4141
_pushStatusTracker,
4242
_telemetryStorage,
43-
null);
43+
null,
44+
180);
4445
}
4546

4647
@Test

‎okhttp-modules/pom.xml‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
<parent>
66
<artifactId>java-client-parent</artifactId>
77
<groupId>io.split.client</groupId>
8-
<version>4.18.1-rc2</version>
8+
<version>4.18.1-rc3</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
11-
<version>4.18.1-rc2</version>
11+
<version>4.18.1-rc3</version>
1212
<artifactId>okhttp-modules</artifactId>
1313
<packaging>jar</packaging>
1414
<name>http-modules</name>

‎pluggable-storage/pom.xml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<artifactId>java-client-parent</artifactId>
88
<groupId>io.split.client</groupId>
9-
<version>4.18.1-rc2</version>
9+
<version>4.18.1-rc3</version>
1010
</parent>
1111

1212
<version>2.1.0</version>

‎pom.xml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>io.split.client</groupId>
66
<artifactId>java-client-parent</artifactId>
7-
<version>4.18.1-rc2</version>
7+
<version>4.18.1-rc3</version>
88
<dependencyManagement>
99
<dependencies>
1010
<dependency>

‎redis-wrapper/pom.xml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<parent>
77
<artifactId>java-client-parent</artifactId>
88
<groupId>io.split.client</groupId>
9-
<version>4.18.1-rc2</version>
9+
<version>4.18.1-rc3</version>
1010
</parent>
1111
<artifactId>redis-wrapper</artifactId>
1212
<version>3.1.1</version>

0 commit comments

Comments
(0)