1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.anyflow.lannister.topic;
17
18 import java.util.List;
19 import java.util.Set;
20 import java.util.concurrent.locks.Lock;
21 import java.util.stream.Collectors;
22
23 import com.google.common.collect.Lists;
24 import com.google.common.collect.Sets;
25
26 import net.anyflow.lannister.cluster.ClusterDataFactory;
27 import net.anyflow.lannister.cluster.Map;
28 import net.anyflow.lannister.cluster.SerializableStringSet;
29
30 public class TopicSubscribers {
31 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopicSubscribers.class);
32
33 private final Map<String, TopicSubscriber> data;
34 private final Map<String, SerializableStringSet> topicnameIndex;
35 private final Map<String, SerializableStringSet> clientidIndex;
36
37 private final Lock modifyLock;
38
39 protected TopicSubscribers() {
40 this.data = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_data");
41 this.topicnameIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_topicnameIndex");
42 this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_clientidIndex");
43
44 this.modifyLock = ClusterDataFactory.INSTANCE.createLock("TopicSubscribers_modifyLock");
45 }
46
47 public static String key(String topicName, String clientId) {
48 return topicName + "_" + clientId;
49 }
50
51 public int size() {
52 return data.size();
53 }
54
55 public Set<String> keySet() {
56 return Sets.newHashSet(data.keySet());
57 }
58
59 public void put(TopicSubscriber topicSubscriber) {
60 if (topicSubscriber == null) { return; }
61
62 modifyLock.lock();
63 try {
64 this.data.put(topicSubscriber.key(), topicSubscriber);
65
66 SerializableStringSet clientIds = this.topicnameIndex.get(topicSubscriber.topicName());
67 if (clientIds == null) {
68 clientIds = new SerializableStringSet();
69 }
70 clientIds.add(topicSubscriber.clientId());
71 this.topicnameIndex.put(topicSubscriber.topicName(), clientIds);
72
73 SerializableStringSet topicNames = this.clientidIndex.get(topicSubscriber.clientId());
74 if (topicNames == null) {
75 topicNames = new SerializableStringSet();
76 }
77 topicNames.add(topicSubscriber.topicName());
78 this.clientidIndex.put(topicSubscriber.clientId(), topicNames);
79
80 if (logger.isDebugEnabled()) {
81 logger.debug("TopicSubscriber added [topicName={}, clientId={}]", topicSubscriber.topicName(),
82 topicSubscriber.clientId());
83 logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
84 topicnameIndex.size(), clientidIndex.size());
85 }
86 }
87 finally {
88 modifyLock.unlock();
89 }
90 }
91
92 public TopicSubscriber get(String key) {
93 return data.get(key);
94 }
95
96 public Set<String> clientIdsOf(String topicName) {
97 Set<String> ret = topicnameIndex.get(topicName);
98
99 return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
100 }
101
102 public Set<String> topicNamesOf(String clientId) {
103 Set<String> ret = clientidIndex.get(clientId);
104
105 return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
106 }
107
108 public void updateByTopicName(String topicName) {
109 TopicSubscription.NEXUS.topicFilters().stream()
110 .filter(topicFilter -> TopicMatcher.match(topicFilter, topicName))
111 .forEach(topicFilter -> TopicSubscription.NEXUS.clientIdsOf(topicFilter)
112 .forEach(clientId -> TopicSubscriber.NEXUS.put(new TopicSubscriber(clientId, topicName))));
113 }
114
115 public TopicSubscriber removeByKey(String topicName, String clientId) {
116 return removeByKey(key(topicName, clientId));
117 }
118
119 private TopicSubscriber removeByKey(String key) {
120 modifyLock.lock();
121 try {
122 TopicSubscriber removed = this.data.remove(key);
123 if (removed == null) { return null; }
124
125 SerializableStringSet clientIds = topicnameIndex.get(removed.topicName());
126 clientIds.remove(removed.clientId());
127 if (clientIds.size() <= 0) {
128 topicnameIndex.remove(removed.topicName());
129 }
130 else {
131 topicnameIndex.put(removed.topicName(), clientIds);
132 }
133
134 SerializableStringSet topicNames = clientidIndex.get(removed.clientId());
135 topicNames.remove(removed.topicName());
136 if (topicNames.size() <= 0) {
137 clientidIndex.remove(removed.clientId());
138 }
139 else {
140 clientidIndex.put(removed.clientId(), topicNames);
141 }
142
143 if (logger.isDebugEnabled()) {
144 logger.debug("TopicSubscriber removed [topicName={}, clientId={}]", removed.topicName(),
145 removed.clientId());
146 logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
147 topicnameIndex.size(), clientidIndex.size());
148 }
149
150 return removed;
151 }
152 finally {
153 modifyLock.unlock();
154 }
155 }
156
157 public Set<String> removeByClientId(String clientId) {
158 modifyLock.lock();
159 try {
160 SerializableStringSet topicNames = this.clientidIndex.remove(clientId);
161 if (topicNames == null) { return Sets.newHashSet(); }
162
163 topicNames.forEach(topicName -> {
164 SerializableStringSet clientIds = topicnameIndex.get(topicName);
165 clientIds.remove(clientId);
166 if (clientIds.size() <= 0) {
167 topicnameIndex.remove(topicName);
168 }
169 else {
170 topicnameIndex.put(topicName, clientIds);
171 }
172 });
173
174 topicNames.stream().map(topicName -> key(topicName, clientId)).forEach(key -> {
175 TopicSubscriber removed = data.remove(key);
176
177 if (logger.isDebugEnabled()) {
178 logger.debug("TopicSubscriber removed [topicName={}, clientId={}]", removed.topicName(),
179 removed.clientId());
180 logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
181 topicnameIndex.size(), clientidIndex.size());
182 }
183 });
184
185 return topicNames;
186 }
187 finally {
188 modifyLock.unlock();
189 }
190 }
191
192 public void removeByTopicFilter(String clientId, String topicFilter) {
193 modifyLock.lock();
194 try {
195 List<String> topicNamesToRemove = this.topicNamesOf(clientId).stream()
196 .filter(topicName -> TopicMatcher.match(topicFilter, topicName)).collect(Collectors.toList());
197
198 Set<String> otherTopicFilters = TopicSubscription.NEXUS.topicFiltersOf(clientId);
199 otherTopicFilters.remove(topicFilter);
200
201
202
203 List<String> tempTopicNames = Lists.newArrayList(topicNamesToRemove);
204 otherTopicFilters.stream().forEach(filter -> {
205 tempTopicNames.stream().forEach(name -> {
206 if (TopicMatcher.match(filter, name)) {
207 topicNamesToRemove.remove(name);
208 }
209 });
210 });
211
212 topicNamesToRemove.stream().map(topicName -> key(topicName, clientId)).forEach(key -> data.remove(key));
213
214 SerializableStringSet topicValues = clientidIndex.get(clientId);
215 if (topicValues != null) {
216 topicValues.removeAll(topicNamesToRemove);
217
218 if (topicValues.size() <= 0) {
219 clientidIndex.remove(clientId);
220 }
221 else {
222 clientidIndex.put(clientId, topicValues);
223 }
224 }
225
226 topicNamesToRemove.forEach(topicName -> {
227 SerializableStringSet clientIds = topicnameIndex.get(topicName);
228 if (clientIds == null) { return; }
229
230 clientIds.remove(clientId);
231 if (clientIds.size() <= 0) {
232 topicnameIndex.remove(topicName);
233 }
234 else {
235 topicnameIndex.put(topicName, clientIds);
236 }
237 });
238
239 if (logger.isDebugEnabled()) {
240 logger.debug("TopicSubscribers removed [topicFilter={}, clientId={}, topicNameCount={}]", topicFilter,
241 clientId, topicNamesToRemove.size());
242 logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
243 topicnameIndex.size(), clientidIndex.size());
244 }
245 }
246 finally {
247 modifyLock.unlock();
248 }
249 }
250 }