1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.topic;
18
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.stream.Collectors;
23
24 import com.google.common.collect.Sets;
25 import com.hazelcast.core.ITopic;
26
27 import net.anyflow.lannister.cluster.ClusterDataFactory;
28 import net.anyflow.lannister.cluster.Map;
29 import net.anyflow.lannister.message.InboundMessageStatus;
30 import net.anyflow.lannister.message.Message;
31 import net.anyflow.lannister.message.OutboundMessageStatus;
32 import net.anyflow.lannister.session.Sessions;
33
34 public class Topics {
35 @SuppressWarnings("unused")
36 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Topics.class);
37
38 private final Map<String, Topic> topics;
39 private final ITopic<Notification> notifier;
40
41 protected Topics(Sessions sessions) {
42 this.topics = ClusterDataFactory.INSTANCE.createMap("topics");
43 this.notifier = ClusterDataFactory.INSTANCE.createTopic("publishNotifier");
44 this.notifier.addMessageListener(sessions);
45 }
46
47 public Set<String> keySet() {
48 return Sets.newHashSet(topics.keySet());
49 }
50
51 public ITopic<Notification> notifier() {
52 return notifier;
53 }
54
55 public Topic get(String name) {
56 return topics.get(name);
57 }
58
59 public enum ClientType {
60 SUBSCRIBER,
61 PUBLISHER
62 }
63
64 public Topic get(String clientId, int messageId, ClientType clientType) {
65 switch (clientType) {
66 case SUBSCRIBER:
67 return getFromSubscriber(clientId, messageId);
68
69 case PUBLISHER:
70 return getFromPublisher(clientId, messageId);
71
72 default:
73 throw new IllegalArgumentException();
74 }
75 }
76
77 private Topic getFromPublisher(String publisherId, int messageId) {
78 InboundMessageStatus status = InboundMessageStatus.NEXUS.getBy(messageId, publisherId);
79 if (status == null) { return null; }
80
81 return Topic.NEXUS.get(status.topicName());
82 }
83
84 private Topic getFromSubscriber(String subscriberId, int messageId) {
85 OutboundMessageStatus status = OutboundMessageStatus.NEXUS.getBy(messageId, subscriberId);
86 if (status == null) { return null; }
87
88 return Topic.NEXUS.get(status.topicName());
89 }
90
91 protected void persist(Topic topic) {
92 assert topic != null;
93
94 topics.put(topic.name(), topic);
95 }
96
97 public void insert(Topic topic) {
98 assert topic != null;
99
100 TopicSubscriber.NEXUS.updateByTopicName(topic.name());
101
102
103 persist(topic);
104 }
105
106 public Topic remove(Topic topic) {
107 assert topic != null;
108
109 return topics.remove(topic.name());
110 }
111
112 public Topic prepare(Message message) {
113 Topic topic = get(message.topicName());
114 if (topic == null) {
115 topic = new Topic(message.topicName());
116 insert(topic);
117 }
118
119 return topic;
120 }
121
122 public List<Topic> matches(String topicFilter) {
123 return topics.keySet().stream().filter(topicName -> TopicMatcher.match(topicFilter, topicName))
124 .map(topicName -> topics.get(topicName)).collect(Collectors.toList());
125 }
126
127 public List<Topic> matches(Collection<String> topicFilters) {
128 return topics.keySet().stream()
129 .filter(topicName -> topicFilters.stream().filter(tf -> TopicMatcher.match(tf, topicName)).count() > 0)
130 .map(topicName -> topics.get(topicName)).collect(Collectors.toList());
131 }
132 }