View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package net.anyflow.lannister.topic;
17  
18  import java.util.List;
19  import java.util.Set;
20  import java.util.concurrent.locks.Lock;
21  import java.util.stream.Collectors;
22  
23  import com.google.common.collect.Lists;
24  import com.google.common.collect.Sets;
25  
26  import net.anyflow.lannister.cluster.ClusterDataFactory;
27  import net.anyflow.lannister.cluster.Map;
28  import net.anyflow.lannister.cluster.SerializableStringSet;
29  
30  public class TopicSubscribers {
31  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopicSubscribers.class);
32  
33  	private final Map<String, TopicSubscriber> data;
34  	private final Map<String, SerializableStringSet> topicnameIndex;
35  	private final Map<String, SerializableStringSet> clientidIndex;
36  
37  	private final Lock modifyLock;
38  
39  	protected TopicSubscribers() {
40  		this.data = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_data");
41  		this.topicnameIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_topicnameIndex");
42  		this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_clientidIndex");
43  
44  		this.modifyLock = ClusterDataFactory.INSTANCE.createLock("TopicSubscribers_modifyLock");
45  	}
46  
47  	public static String key(String topicName, String clientId) {
48  		return topicName + "_" + clientId;
49  	}
50  
51  	public int size() {
52  		return data.size();
53  	}
54  
55  	public Set<String> keySet() {
56  		return Sets.newHashSet(data.keySet());
57  	}
58  
59  	public void put(TopicSubscriber topicSubscriber) {
60  		if (topicSubscriber == null) { return; }
61  
62  		modifyLock.lock();
63  		try {
64  			this.data.put(topicSubscriber.key(), topicSubscriber);
65  
66  			SerializableStringSet clientIds = this.topicnameIndex.get(topicSubscriber.topicName());
67  			if (clientIds == null) {
68  				clientIds = new SerializableStringSet();
69  			}
70  			clientIds.add(topicSubscriber.clientId());
71  			this.topicnameIndex.put(topicSubscriber.topicName(), clientIds);
72  
73  			SerializableStringSet topicNames = this.clientidIndex.get(topicSubscriber.clientId());
74  			if (topicNames == null) {
75  				topicNames = new SerializableStringSet();
76  			}
77  			topicNames.add(topicSubscriber.topicName());
78  			this.clientidIndex.put(topicSubscriber.clientId(), topicNames);
79  
80  			if (logger.isDebugEnabled()) {
81  				logger.debug("TopicSubscriber added [topicName={}, clientId={}]", topicSubscriber.topicName(),
82  						topicSubscriber.clientId());
83  				logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
84  						topicnameIndex.size(), clientidIndex.size());
85  			}
86  		}
87  		finally {
88  			modifyLock.unlock();
89  		}
90  	}
91  
92  	public TopicSubscriber get(String key) {
93  		return data.get(key);
94  	}
95  
96  	public Set<String> clientIdsOf(String topicName) {
97  		Set<String> ret = topicnameIndex.get(topicName);
98  
99  		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
100 	}
101 
102 	public Set<String> topicNamesOf(String clientId) {
103 		Set<String> ret = clientidIndex.get(clientId);
104 
105 		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
106 	}
107 
108 	public void updateByTopicName(String topicName) {
109 		TopicSubscription.NEXUS.topicFilters().stream()
110 				.filter(topicFilter -> TopicMatcher.match(topicFilter, topicName))
111 				.forEach(topicFilter -> TopicSubscription.NEXUS.clientIdsOf(topicFilter)
112 						.forEach(clientId -> TopicSubscriber.NEXUS.put(new TopicSubscriber(clientId, topicName))));
113 	}
114 
115 	public TopicSubscriber removeByKey(String topicName, String clientId) {
116 		return removeByKey(key(topicName, clientId));
117 	}
118 
119 	private TopicSubscriber removeByKey(String key) {
120 		modifyLock.lock();
121 		try {
122 			TopicSubscriber removed = this.data.remove(key);
123 			if (removed == null) { return null; }
124 
125 			SerializableStringSet clientIds = topicnameIndex.get(removed.topicName());
126 			clientIds.remove(removed.clientId());
127 			if (clientIds.size() <= 0) {
128 				topicnameIndex.remove(removed.topicName());
129 			}
130 			else {
131 				topicnameIndex.put(removed.topicName(), clientIds);
132 			}
133 
134 			SerializableStringSet topicNames = clientidIndex.get(removed.clientId());
135 			topicNames.remove(removed.topicName());
136 			if (topicNames.size() <= 0) {
137 				clientidIndex.remove(removed.clientId());
138 			}
139 			else {
140 				clientidIndex.put(removed.clientId(), topicNames);
141 			}
142 
143 			if (logger.isDebugEnabled()) {
144 				logger.debug("TopicSubscriber removed [topicName={}, clientId={}]", removed.topicName(),
145 						removed.clientId());
146 				logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
147 						topicnameIndex.size(), clientidIndex.size());
148 			}
149 
150 			return removed;
151 		}
152 		finally {
153 			modifyLock.unlock();
154 		}
155 	}
156 
157 	public Set<String> removeByClientId(String clientId) {
158 		modifyLock.lock();
159 		try {
160 			SerializableStringSet topicNames = this.clientidIndex.remove(clientId);
161 			if (topicNames == null) { return Sets.newHashSet(); }
162 
163 			topicNames.forEach(topicName -> {
164 				SerializableStringSet clientIds = topicnameIndex.get(topicName);
165 				clientIds.remove(clientId);
166 				if (clientIds.size() <= 0) {
167 					topicnameIndex.remove(topicName);
168 				}
169 				else {
170 					topicnameIndex.put(topicName, clientIds);
171 				}
172 			});
173 
174 			topicNames.stream().map(topicName -> key(topicName, clientId)).forEach(key -> {
175 				TopicSubscriber removed = data.remove(key);
176 
177 				if (logger.isDebugEnabled()) {
178 					logger.debug("TopicSubscriber removed [topicName={}, clientId={}]", removed.topicName(),
179 							removed.clientId());
180 					logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
181 							topicnameIndex.size(), clientidIndex.size());
182 				}
183 			});
184 
185 			return topicNames;
186 		}
187 		finally {
188 			modifyLock.unlock();
189 		}
190 	}
191 
192 	public void removeByTopicFilter(String clientId, String topicFilter) {
193 		modifyLock.lock();
194 		try {
195 			List<String> topicNamesToRemove = this.topicNamesOf(clientId).stream()
196 					.filter(topicName -> TopicMatcher.match(topicFilter, topicName)).collect(Collectors.toList());
197 
198 			Set<String> otherTopicFilters = TopicSubscription.NEXUS.topicFiltersOf(clientId);
199 			otherTopicFilters.remove(topicFilter);
200 
201 			// Remove topicNames from topicNmaesToRemove which matches with
202 			// other topic filters
203 			List<String> tempTopicNames = Lists.newArrayList(topicNamesToRemove);
204 			otherTopicFilters.stream().forEach(filter -> {
205 				tempTopicNames.stream().forEach(name -> {
206 					if (TopicMatcher.match(filter, name)) {
207 						topicNamesToRemove.remove(name);
208 					}
209 				});
210 			});
211 
212 			topicNamesToRemove.stream().map(topicName -> key(topicName, clientId)).forEach(key -> data.remove(key));
213 
214 			SerializableStringSet topicValues = clientidIndex.get(clientId);
215 			if (topicValues != null) {
216 				topicValues.removeAll(topicNamesToRemove);
217 
218 				if (topicValues.size() <= 0) {
219 					clientidIndex.remove(clientId);
220 				}
221 				else {
222 					clientidIndex.put(clientId, topicValues);
223 				}
224 			}
225 
226 			topicNamesToRemove.forEach(topicName -> {
227 				SerializableStringSet clientIds = topicnameIndex.get(topicName);
228 				if (clientIds == null) { return; }
229 
230 				clientIds.remove(clientId);
231 				if (clientIds.size() <= 0) {
232 					topicnameIndex.remove(topicName);
233 				}
234 				else {
235 					topicnameIndex.put(topicName, clientIds);
236 				}
237 			});
238 
239 			if (logger.isDebugEnabled()) {
240 				logger.debug("TopicSubscribers removed [topicFilter={}, clientId={}, topicNameCount={}]", topicFilter,
241 						clientId, topicNamesToRemove.size());
242 				logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
243 						topicnameIndex.size(), clientidIndex.size());
244 			}
245 		}
246 		finally {
247 			modifyLock.unlock();
248 		}
249 	}
250 }