1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.anyflow.lannister.topic;
17
18 import java.util.Set;
19 import java.util.concurrent.locks.Lock;
20
21 import com.google.common.collect.Sets;
22
23 import net.anyflow.lannister.cluster.ClusterDataFactory;
24 import net.anyflow.lannister.cluster.Map;
25 import net.anyflow.lannister.cluster.SerializableStringSet;
26
27 public class TopicSubscriptions {
28 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopicSubscriptions.class);
29
30 private final Map<String, TopicSubscription> data;
31 private final Map<String, SerializableStringSet> topicfilterIndex;
32 private final Map<String, SerializableStringSet> clientidIndex;
33
34 private final Lock modifyLock;
35
36 protected TopicSubscriptions() {
37 this.data = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_data");
38 this.topicfilterIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_topicfilterIndex");
39 this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_clientidIndex");
40
41 this.modifyLock = ClusterDataFactory.INSTANCE.createLock("TopicSubscriptions_modifyLock");
42 }
43
44 public static String key(String topicFilter, String clientId) {
45 return topicFilter + "_" + clientId;
46 }
47
48 public int size() {
49 return data.size();
50 }
51
52 public Set<String> keySet() {
53 return Sets.newHashSet(data.keySet());
54 }
55
56 public void put(TopicSubscription topicSubscription) {
57 if (topicSubscription == null) { return; }
58
59 modifyLock.lock();
60 try {
61 this.data.put(topicSubscription.key(), topicSubscription);
62
63 SerializableStringSet clientIds = this.topicfilterIndex.get(topicSubscription.topicFilter());
64 if (clientIds == null) {
65 clientIds = new SerializableStringSet();
66 }
67 clientIds.add(topicSubscription.clientId());
68 this.topicfilterIndex.put(topicSubscription.topicFilter(), clientIds);
69
70 SerializableStringSet topicFilters = this.clientidIndex.get(topicSubscription.clientId());
71 if (topicFilters == null) {
72 topicFilters = new SerializableStringSet();
73 }
74 topicFilters.add(topicSubscription.topicFilter());
75 this.clientidIndex.put(topicSubscription.clientId(), topicFilters);
76
77 Topic.NEXUS.keySet().stream()
78 .filter(topicName -> TopicMatcher.match(topicSubscription.topicFilter(), topicName))
79 .forEach(topicName -> TopicSubscriber.NEXUS
80 .put(new TopicSubscriber(topicSubscription.clientId(), topicName)));
81
82 if (logger.isDebugEnabled()) {
83 logger.debug("TopicSubscription added [topicFilter={}, clientId={}, qos={}]",
84 topicSubscription.topicFilter(), topicSubscription.clientId(), topicSubscription.qos());
85 logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]", data.size(),
86 topicfilterIndex.size(), clientidIndex.size());
87 }
88 }
89 finally {
90 modifyLock.unlock();
91 }
92 }
93
94 public Set<String> topicFilters() {
95 return Sets.newHashSet(topicfilterIndex.keySet());
96 }
97
98 public TopicSubscription get(String key) {
99 return data.get(key);
100 }
101
102 public TopicSubscription getBy(String topicFilter, String clientId) {
103 return get(key(topicFilter, clientId));
104 }
105
106 public Set<String> clientIdsOf(String topicFilter) {
107 Set<String> ret = topicfilterIndex.get(topicFilter);
108
109 return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
110 }
111
112 public Set<String> topicFiltersOf(String clientId) {
113 Set<String> ret = clientidIndex.get(clientId);
114
115 return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
116 }
117
118 public TopicSubscription removeByKey(String topicFilter, String clientId) {
119 return removeByKey(key(topicFilter, clientId));
120 }
121
122 private TopicSubscription removeByKey(String key) {
123 modifyLock.lock();
124
125 try {
126 TopicSubscription removed = this.data.remove(key);
127 if (removed == null) { return null; }
128
129 SerializableStringSet clientIds = topicfilterIndex.get(removed.topicFilter());
130 clientIds.remove(removed.clientId());
131
132 if (clientIds.size() <= 0) {
133 topicfilterIndex.remove(removed.topicFilter());
134 }
135 else {
136 topicfilterIndex.put(removed.topicFilter(), clientIds);
137 }
138
139 SerializableStringSet topicFilters = clientidIndex.get(removed.clientId());
140 topicFilters.remove(removed.topicFilter());
141
142 if (topicFilters.size() <= 0) {
143 clientidIndex.remove(removed.clientId());
144 }
145 else {
146 clientidIndex.put(removed.clientId(), topicFilters);
147 }
148
149 if (logger.isDebugEnabled()) {
150 logger.debug("TopicSubscription removed [topicFilter={}, clientId={}, qos={}]", removed.topicFilter(),
151 removed.clientId(), removed.qos());
152 logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]", data.size(),
153 topicfilterIndex.size(), clientidIndex.size());
154 }
155 return removed;
156 }
157 finally {
158 modifyLock.unlock();
159 }
160 }
161
162 public Set<String> removeByClientId(String clientId) {
163 modifyLock.lock();
164
165 try {
166 SerializableStringSet topicFilters = this.clientidIndex.remove(clientId);
167 if (topicFilters == null) { return Sets.newHashSet(); }
168
169 topicFilters.forEach(topicFilter -> {
170 SerializableStringSet target = topicfilterIndex.get(topicFilter);
171 target.remove(clientId);
172
173 if (target.size() <= 0) {
174 topicfilterIndex.remove(topicFilter);
175 }
176 else {
177 topicfilterIndex.put(topicFilter, target);
178 }
179 });
180
181 topicFilters.stream().map(topicFilter -> key(topicFilter, clientId)).forEach(key -> {
182 TopicSubscription removed = data.remove(key);
183
184 if (logger.isDebugEnabled()) {
185 logger.debug("TopicSubscription removed [topicFilter={}, clientId={}, qos={}]",
186 removed.topicFilter(), removed.clientId(), removed.qos());
187 logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]",
188 data.size(), topicfilterIndex.size(), clientidIndex.size());
189 }
190 });
191
192 return topicFilters;
193 }
194 finally {
195 modifyLock.unlock();
196 }
197 }
198 }