1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.anyflow.lannister.cluster;
17
18 import java.util.Date;
19 import java.util.Map;
20 import java.util.UUID;
21
22 import com.google.common.collect.Maps;
23 import com.hazelcast.core.ITopic;
24 import com.hazelcast.core.Message;
25 import com.hazelcast.core.MessageListener;
26 import com.hazelcast.monitor.LocalTopicStats;
27
28 import io.netty.util.concurrent.GlobalEventExecutor;
29
30 public class SingleTopic<E> implements ITopic<E> {
31
32 private final String name;
33 private final Map<String, MessageListener<E>> messageListeners;
34
35 public SingleTopic(String name) {
36 this.name = name;
37 this.messageListeners = Maps.newConcurrentMap();
38 }
39
40 @Override
41 public void destroy() {
42
43 }
44
45 @Override
46 public String getName() {
47 return name;
48 }
49
50 @Override
51 public void publish(E message) {
52 final Date now = new Date();
53
54 GlobalEventExecutor.INSTANCE.execute(() -> {
55 Message<E> msg = new Message<E>(name, message, now.getTime(), null);
56 messageListeners.values().forEach(c -> c.onMessage(msg));
57 });
58 }
59
60 @Override
61 public String addMessageListener(MessageListener<E> listener) {
62 UUID ret = UUID.randomUUID();
63
64 messageListeners.put(ret.toString(), listener);
65
66 return ret.toString();
67 }
68
69 @Override
70 public boolean removeMessageListener(String registrationId) {
71 return messageListeners.remove(registrationId) != null;
72 }
73
74 @Override
75 public String getPartitionKey() {
76 throw new Error("The method should not be called");
77 }
78
79 @Override
80 public String getServiceName() {
81 throw new Error("The method should not be called");
82 }
83
84 @Override
85 public LocalTopicStats getLocalTopicStats() {
86 throw new Error("The method should not be called");
87 }
88 }