1111import java .time .Duration ;
1212import java .util .*;
1313import java .util .concurrent .CopyOnWriteArrayList ;
14+ import java .util .concurrent .atomic .AtomicBoolean ;
1415
1516@ SuppressWarnings ("unused" )
1617public class Redis {
@@ -19,6 +20,36 @@ public interface RedisDataListener {
1920 default void onDataSet (String key , String value ) {}
2021 }
2122
23+ public static class Subscription {
24+ private final JedisPubSub jedisPubSub ;
25+ private final String [] channels ;
26+ private final AtomicBoolean active = new AtomicBoolean (true );
27+ private final MessageListener mockListener ;
28+
29+ public Subscription (JedisPubSub jedisPubSub , String ... channels ) {
30+ this .jedisPubSub = jedisPubSub ;
31+ this .channels = channels ;
32+ this .mockListener = null ;
33+ }
34+
35+ public Subscription (MessageListener mockListener , String ... channels ) {
36+ this .jedisPubSub = null ;
37+ this .channels = channels ;
38+ this .mockListener = mockListener ;
39+ }
40+
41+ public void unsubscribe () {
42+ if (active .compareAndSet (true , false )) {
43+ if (jedisPubSub != null && jedisPubSub .isSubscribed ()) {
44+ jedisPubSub .unsubscribe (channels );
45+ }
46+ }
47+ }
48+
49+ protected MessageListener getMockListener () { return mockListener ; }
50+ protected String [] getChannels () { return channels ; }
51+ }
52+
2253 private final List <RedisDataListener > dataListeners = new CopyOnWriteArrayList <>();
2354 private static final boolean DEFAULT_USE_EMBEDDED = false ;
2455 private static final boolean DEFAULT_ALLOW_PORT_SEARCH = false ;
@@ -490,32 +521,43 @@ public void publish(String channel, String message) {
490521 }
491522 }
492523
493- public void subscribe (String channel , MessageListener messageListener ) {
524+ public Subscription subscribe (String channel , MessageListener messageListener ) {
494525 if (useMockFallback ) {
495- mockRedis .subscribe (channel , messageListener );
496- return ;
526+ return mockRedis .subscribe (channel , messageListener );
497527 }
498528
499529 if (!isConnected ()) {
500530 Log .error ("Not connected to Redis server. Cannot subscribe to channel." );
501- return ;
531+ return null ;
502532 }
503533
534+ final JedisPubSub jedisPubSub = new JedisPubSub () {
535+ @ Override
536+ public void onMessage (String ch , String message ) {
537+ messageListener .onMessage (ch , message );
538+ }
539+ };
540+
541+ final Subscription subscription = new Subscription (jedisPubSub , channel );
542+
504543 new Thread (() -> {
505544 try (Jedis jedis = jedisPool .getResource ()) {
506- jedis .subscribe (new JedisPubSub () {
507- @ Override
508- public void onMessage (String channel , String message ) {
509- messageListener .onMessage (channel , message );
510- }
511- }, channel );
545+ Log .info ("Subscribing to channel: " + channel );
546+ jedis .subscribe (jedisPubSub , channel );
547+ Log .info ("Unsubscribed from channel: " + channel );
512548 } catch (JedisConnectionException e ) {
513- Log .error ("Jedis connection error during SUBSCRIBE operation" , e );
549+ Log .error ("Jedis connection error during SUBSCRIBE operation on channel " + channel , e );
514550 connected = false ;
515551 } catch (Exception e ) {
516- Log .error ("Error subscribing to Redis channel" , e );
552+ if (e .getMessage ().contains ("Socket closed" ) || e .getMessage ().contains ("Connection reset" )) {
553+ Log .info ("Subscription for channel " + channel + " was terminated." );
554+ } else {
555+ Log .error ("Error subscribing to Redis channel " + channel , e );
556+ }
517557 }
518558 }, "Redis-Subscriber-" + channel ).start ();
559+
560+ return subscription ;
519561 }
520562
521563 public Long enqueue (String queueName , String message ) {
@@ -584,7 +626,7 @@ public void registerQueueConsumer(String queueName, QueueMessageConsumer consume
584626
585627 public void registerReliableQueueConsumer (String queueName , QueueMessageConsumer consumer ) {
586628 if (useMockFallback ) {
587- mockRedis .registerQueueConsumer (queueName , consumer ); // Mock uses simple consumer
629+ mockRedis .registerQueueConsumer (queueName , consumer );
588630 return ;
589631 }
590632 if (!isConnected ()) {
@@ -750,6 +792,7 @@ public void disconnect() {
750792 stopServer ();
751793 }
752794
795+ @ Override
753796 public void publish (String channel , String message ) {
754797 if (!isRunning ()) {
755798 Log .error ("Mock Redis server is not running. Cannot publish message." );
@@ -758,31 +801,41 @@ public void publish(String channel, String message) {
758801
759802 List <MessageListener > channelSubscribers = subscribers .get (channel );
760803 if (channelSubscribers != null ) {
761- for (MessageListener listener : channelSubscribers ) {
804+ for (MessageListener listener : new ArrayList <>( channelSubscribers ) ) {
762805 listener .onMessage (channel , message );
763806 }
764807 }
765808 }
766809
767- public void subscribe (String channel , MessageListener messageListener ) {
810+ @ Override
811+ public Subscription subscribe (String channel , MessageListener messageListener ) {
768812 if (!isRunning ()) {
769813 Log .error ("Mock Redis server is not running. Cannot subscribe to channel." );
770- return ;
814+ return null ;
815+ }
816+ subscribers .computeIfAbsent (channel , k -> new CopyOnWriteArrayList <>()).add (messageListener );
817+ Log .info ("Mock subscribed to channel: " + channel );
818+ return new Subscription (messageListener , channel );
819+ }
820+
821+ public void unsubscribe (Subscription subscription ) {
822+ if (!isRunning () || subscription == null || subscription .getMockListener () == null ) return ;
823+ for (String channel : subscription .getChannels ()) {
824+ List <MessageListener > listeners = subscribers .get (channel );
825+ if (listeners != null ) {
826+ listeners .remove (subscription .getMockListener ());
827+ Log .info ("Mock unsubscribed from channel: " + channel );
828+ }
771829 }
772-
773- subscribers .computeIfAbsent (channel , k -> new CopyOnWriteArrayList <>())
774- .add (messageListener );
775830 }
776831
777832 public Long enqueue (String queueName , String message ) {
778833 if (!isRunning ()) {
779834 Log .error ("Mock Redis server is not running. Cannot enqueue message." );
780835 return -1L ;
781836 }
782-
783837 List <String > queue = messageQueues .computeIfAbsent (queueName , k -> new CopyOnWriteArrayList <>());
784838 queue .add (message );
785-
786839 List <QueueMessageConsumer > consumers = queueConsumers .get (queueName );
787840 if (consumers != null && !consumers .isEmpty ()) {
788841 QueueMessageConsumer consumer = consumers .get (0 );
@@ -799,12 +852,10 @@ public String dequeue(String queueName) {
799852 Log .error ("Mock Redis server is not running. Cannot dequeue message." );
800853 return null ;
801854 }
802-
803855 List <String > queue = messageQueues .get (queueName );
804856 if (queue == null || queue .isEmpty ()) {
805857 return null ;
806858 }
807-
808859 return queue .remove (0 );
809860 }
810861
@@ -813,10 +864,7 @@ public void registerQueueConsumer(String queueName, QueueMessageConsumer consume
813864 Log .error ("Mock Redis server is not running. Cannot register queue consumer." );
814865 return ;
815866 }
816-
817- queueConsumers .computeIfAbsent (queueName , k -> new CopyOnWriteArrayList <>())
818- .add (consumer );
819-
867+ queueConsumers .computeIfAbsent (queueName , k -> new CopyOnWriteArrayList <>()).add (consumer );
820868 List <String > queue = messageQueues .get (queueName );
821869 if (queue != null && !queue .isEmpty ()) {
822870 String message = queue .remove (0 );
0 commit comments