3
3
import dev .openfeature .sdk .exceptions .GeneralError ;
4
4
import io .grpc .ConnectivityState ;
5
5
import io .grpc .ManagedChannel ;
6
- import lombok .extern .slf4j .Slf4j ;
7
-
8
6
import java .util .concurrent .CountDownLatch ;
9
7
import java .util .concurrent .ScheduledExecutorService ;
10
8
import java .util .concurrent .ScheduledFuture ;
11
9
import java .util .concurrent .TimeUnit ;
12
10
import java .util .concurrent .atomic .AtomicReference ;
13
-
11
+ import lombok . extern . slf4j . Slf4j ;
14
12
15
13
/**
16
14
* A utility class to monitor and manage the connectivity state of a gRPC ManagedChannel.
17
15
*/
18
16
@ Slf4j
19
17
public class ChannelMonitor {
20
18
21
-
22
- private ChannelMonitor () {
23
-
24
- }
19
+ private ChannelMonitor () {}
25
20
26
21
/**
27
22
* Monitors the state of a gRPC channel and triggers the specified callbacks based on state changes.
@@ -31,8 +26,11 @@ private ChannelMonitor() {
31
26
* @param onConnectionReady callback invoked when the channel transitions to a READY state.
32
27
* @param onConnectionLost callback invoked when the channel transitions to a FAILURE or SHUTDOWN state.
33
28
*/
34
- public static void monitorChannelState (ConnectivityState expectedState , ManagedChannel channel ,
35
- Runnable onConnectionReady , Runnable onConnectionLost ) {
29
+ public static void monitorChannelState (
30
+ ConnectivityState expectedState ,
31
+ ManagedChannel channel ,
32
+ Runnable onConnectionReady ,
33
+ Runnable onConnectionLost ) {
36
34
channel .notifyWhenStateChanged (expectedState , () -> {
37
35
ConnectivityState currentState = channel .getState (true );
38
36
log .info ("Channel state changed to: {}" , currentState );
@@ -47,7 +45,6 @@ public static void monitorChannelState(ConnectivityState expectedState, ManagedC
47
45
});
48
46
}
49
47
50
-
51
48
/**
52
49
* Waits for the channel to reach a desired state within a specified timeout period.
53
50
*
@@ -58,21 +55,24 @@ public static void monitorChannelState(ConnectivityState expectedState, ManagedC
58
55
* @param unit the time unit of the timeout.
59
56
* @throws InterruptedException if the current thread is interrupted while waiting.
60
57
*/
61
- public static void waitForDesiredState (ManagedChannel channel ,
62
- ConnectivityState desiredState ,
63
- Runnable connectCallback ,
64
- long timeout ,
65
- TimeUnit unit ) throws InterruptedException {
58
+ public static void waitForDesiredState (
59
+ ManagedChannel channel ,
60
+ ConnectivityState desiredState ,
61
+ Runnable connectCallback ,
62
+ long timeout ,
63
+ TimeUnit unit )
64
+ throws InterruptedException {
66
65
waitForDesiredState (channel , desiredState , connectCallback , new CountDownLatch (1 ), timeout , unit );
67
66
}
68
67
69
-
70
- private static void waitForDesiredState (ManagedChannel channel ,
71
- ConnectivityState desiredState ,
72
- Runnable connectCallback ,
73
- CountDownLatch latch ,
74
- long timeout ,
75
- TimeUnit unit ) throws InterruptedException {
68
+ private static void waitForDesiredState (
69
+ ManagedChannel channel ,
70
+ ConnectivityState desiredState ,
71
+ Runnable connectCallback ,
72
+ CountDownLatch latch ,
73
+ long timeout ,
74
+ TimeUnit unit )
75
+ throws InterruptedException {
76
76
channel .notifyWhenStateChanged (ConnectivityState .SHUTDOWN , () -> {
77
77
try {
78
78
ConnectivityState state = channel .getState (true );
@@ -94,12 +94,11 @@ private static void waitForDesiredState(ManagedChannel channel,
94
94
95
95
// Await the latch or timeout for the state change
96
96
if (!latch .await (timeout , unit )) {
97
- throw new GeneralError (String .format ("Deadline exceeded. Condition did not complete within the %d "
98
- + "deadline" , timeout ));
97
+ throw new GeneralError (String .format (
98
+ "Deadline exceeded. Condition did not complete within the %d " + "deadline" , timeout ));
99
99
}
100
100
}
101
101
102
-
103
102
/**
104
103
* Polls the state of a gRPC channel at regular intervals and triggers callbacks upon state changes.
105
104
*
@@ -109,9 +108,12 @@ private static void waitForDesiredState(ManagedChannel channel,
109
108
* @param onConnectionLost callback invoked when the channel transitions to a FAILURE or SHUTDOWN state.
110
109
* @param pollIntervalMs the polling interval in milliseconds.
111
110
*/
112
- public static void pollChannelState (ScheduledExecutorService executor , ManagedChannel channel ,
113
- Runnable onConnectionReady ,
114
- Runnable onConnectionLost , long pollIntervalMs ) {
111
+ public static void pollChannelState (
112
+ ScheduledExecutorService executor ,
113
+ ManagedChannel channel ,
114
+ Runnable onConnectionReady ,
115
+ Runnable onConnectionLost ,
116
+ long pollIntervalMs ) {
115
117
116
118
AtomicReference <ConnectivityState > lastState = new AtomicReference <>(ConnectivityState .READY );
117
119
@@ -132,7 +134,6 @@ public static void pollChannelState(ScheduledExecutorService executor, ManagedCh
132
134
executor .scheduleAtFixedRate (pollTask , 0 , pollIntervalMs , TimeUnit .MILLISECONDS );
133
135
}
134
136
135
-
136
137
/**
137
138
* Polls the channel state at fixed intervals and waits for the channel to reach a desired state within a timeout
138
139
* period.
@@ -146,9 +147,14 @@ public static void pollChannelState(ScheduledExecutorService executor, ManagedCh
146
147
* @return {@code true} if the desired state was reached within the timeout period, {@code false} otherwise.
147
148
* @throws InterruptedException if the current thread is interrupted while waiting.
148
149
*/
149
- public static boolean pollForDesiredState (ScheduledExecutorService executor , ManagedChannel channel ,
150
- ConnectivityState desiredState , Runnable connectCallback , long timeout ,
151
- TimeUnit unit ) throws InterruptedException {
150
+ public static boolean pollForDesiredState (
151
+ ScheduledExecutorService executor ,
152
+ ManagedChannel channel ,
153
+ ConnectivityState desiredState ,
154
+ Runnable connectCallback ,
155
+ long timeout ,
156
+ TimeUnit unit )
157
+ throws InterruptedException {
152
158
CountDownLatch latch = new CountDownLatch (1 );
153
159
154
160
Runnable waitForStateTask = () -> {
@@ -159,8 +165,8 @@ public static boolean pollForDesiredState(ScheduledExecutorService executor, Man
159
165
}
160
166
};
161
167
162
- ScheduledFuture <?> scheduledFuture = executor . scheduleWithFixedDelay ( waitForStateTask , 0 , 100 ,
163
- TimeUnit .MILLISECONDS );
168
+ ScheduledFuture <?> scheduledFuture =
169
+ executor . scheduleWithFixedDelay ( waitForStateTask , 0 , 100 , TimeUnit .MILLISECONDS );
164
170
165
171
boolean success = latch .await (timeout , unit );
166
172
scheduledFuture .cancel (true );
0 commit comments