View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package net.anyflow.lannister.topic;
17  
18  import java.util.Set;
19  import java.util.concurrent.locks.Lock;
20  
21  import com.google.common.collect.Sets;
22  
23  import net.anyflow.lannister.cluster.ClusterDataFactory;
24  import net.anyflow.lannister.cluster.Map;
25  import net.anyflow.lannister.cluster.SerializableStringSet;
26  
27  public class TopicSubscriptions {
28  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopicSubscriptions.class);
29  
30  	private final Map<String, TopicSubscription> data;
31  	private final Map<String, SerializableStringSet> topicfilterIndex;
32  	private final Map<String, SerializableStringSet> clientidIndex;
33  
34  	private final Lock modifyLock;
35  
36  	protected TopicSubscriptions() {
37  		this.data = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_data");
38  		this.topicfilterIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_topicfilterIndex");
39  		this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_clientidIndex");
40  
41  		this.modifyLock = ClusterDataFactory.INSTANCE.createLock("TopicSubscriptions_modifyLock");
42  	}
43  
44  	public static String key(String topicFilter, String clientId) {
45  		return topicFilter + "_" + clientId;
46  	}
47  
48  	public int size() {
49  		return data.size();
50  	}
51  
52  	public Set<String> keySet() {
53  		return Sets.newHashSet(data.keySet());
54  	}
55  
56  	public void put(TopicSubscription topicSubscription) {
57  		if (topicSubscription == null) { return; }
58  
59  		modifyLock.lock();
60  		try {
61  			this.data.put(topicSubscription.key(), topicSubscription);
62  
63  			SerializableStringSet clientIds = this.topicfilterIndex.get(topicSubscription.topicFilter());
64  			if (clientIds == null) {
65  				clientIds = new SerializableStringSet();
66  			}
67  			clientIds.add(topicSubscription.clientId());
68  			this.topicfilterIndex.put(topicSubscription.topicFilter(), clientIds);
69  
70  			SerializableStringSet topicFilters = this.clientidIndex.get(topicSubscription.clientId());
71  			if (topicFilters == null) {
72  				topicFilters = new SerializableStringSet();
73  			}
74  			topicFilters.add(topicSubscription.topicFilter());
75  			this.clientidIndex.put(topicSubscription.clientId(), topicFilters);
76  
77  			Topic.NEXUS.keySet().stream()
78  					.filter(topicName -> TopicMatcher.match(topicSubscription.topicFilter(), topicName))
79  					.forEach(topicName -> TopicSubscriber.NEXUS
80  							.put(new TopicSubscriber(topicSubscription.clientId(), topicName)));
81  
82  			if (logger.isDebugEnabled()) {
83  				logger.debug("TopicSubscription added [topicFilter={}, clientId={}, qos={}]",
84  						topicSubscription.topicFilter(), topicSubscription.clientId(), topicSubscription.qos());
85  				logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]", data.size(),
86  						topicfilterIndex.size(), clientidIndex.size());
87  			}
88  		}
89  		finally {
90  			modifyLock.unlock();
91  		}
92  	}
93  
94  	public Set<String> topicFilters() {
95  		return Sets.newHashSet(topicfilterIndex.keySet());
96  	}
97  
98  	public TopicSubscription get(String key) {
99  		return data.get(key);
100 	}
101 
102 	public TopicSubscription getBy(String topicFilter, String clientId) {
103 		return get(key(topicFilter, clientId));
104 	}
105 
106 	public Set<String> clientIdsOf(String topicFilter) {
107 		Set<String> ret = topicfilterIndex.get(topicFilter);
108 
109 		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
110 	}
111 
112 	public Set<String> topicFiltersOf(String clientId) {
113 		Set<String> ret = clientidIndex.get(clientId);
114 
115 		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
116 	}
117 
118 	public TopicSubscription removeByKey(String topicFilter, String clientId) {
119 		return removeByKey(key(topicFilter, clientId));
120 	}
121 
122 	private TopicSubscription removeByKey(String key) {
123 		modifyLock.lock();
124 
125 		try {
126 			TopicSubscription removed = this.data.remove(key);
127 			if (removed == null) { return null; }
128 
129 			SerializableStringSet clientIds = topicfilterIndex.get(removed.topicFilter());
130 			clientIds.remove(removed.clientId());
131 
132 			if (clientIds.size() <= 0) {
133 				topicfilterIndex.remove(removed.topicFilter());
134 			}
135 			else {
136 				topicfilterIndex.put(removed.topicFilter(), clientIds);
137 			}
138 
139 			SerializableStringSet topicFilters = clientidIndex.get(removed.clientId());
140 			topicFilters.remove(removed.topicFilter());
141 
142 			if (topicFilters.size() <= 0) {
143 				clientidIndex.remove(removed.clientId());
144 			}
145 			else {
146 				clientidIndex.put(removed.clientId(), topicFilters);
147 			}
148 
149 			if (logger.isDebugEnabled()) {
150 				logger.debug("TopicSubscription removed [topicFilter={}, clientId={}, qos={}]", removed.topicFilter(),
151 						removed.clientId(), removed.qos());
152 				logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]", data.size(),
153 						topicfilterIndex.size(), clientidIndex.size());
154 			}
155 			return removed;
156 		}
157 		finally {
158 			modifyLock.unlock();
159 		}
160 	}
161 
162 	public Set<String> removeByClientId(String clientId) {
163 		modifyLock.lock();
164 
165 		try {
166 			SerializableStringSet topicFilters = this.clientidIndex.remove(clientId);
167 			if (topicFilters == null) { return Sets.newHashSet(); }
168 
169 			topicFilters.forEach(topicFilter -> {
170 				SerializableStringSet target = topicfilterIndex.get(topicFilter);
171 				target.remove(clientId);
172 
173 				if (target.size() <= 0) {
174 					topicfilterIndex.remove(topicFilter);
175 				}
176 				else {
177 					topicfilterIndex.put(topicFilter, target);
178 				}
179 			});
180 
181 			topicFilters.stream().map(topicFilter -> key(topicFilter, clientId)).forEach(key -> {
182 				TopicSubscription removed = data.remove(key);
183 
184 				if (logger.isDebugEnabled()) {
185 					logger.debug("TopicSubscription removed [topicFilter={}, clientId={}, qos={}]",
186 							removed.topicFilter(), removed.clientId(), removed.qos());
187 					logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]",
188 							data.size(), topicfilterIndex.size(), clientidIndex.size());
189 				}
190 			});
191 
192 			return topicFilters;
193 		}
194 		finally {
195 			modifyLock.unlock();
196 		}
197 	}
198 }