TopicSubscribers.java
/*
* Copyright 2016 The Lannister Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.anyflow.lannister.topic;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import net.anyflow.lannister.cluster.ClusterDataFactory;
import net.anyflow.lannister.cluster.Map;
import net.anyflow.lannister.cluster.SerializableStringSet;
public class TopicSubscribers {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopicSubscribers.class);
private final Map<String, TopicSubscriber> data;
private final Map<String, SerializableStringSet> topicnameIndex;
private final Map<String, SerializableStringSet> clientidIndex;
private final Lock modifyLock;
protected TopicSubscribers() {
this.data = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_data");
this.topicnameIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_topicnameIndex");
this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscribers_clientidIndex");
this.modifyLock = ClusterDataFactory.INSTANCE.createLock("TopicSubscribers_modifyLock");
}
public static String key(String topicName, String clientId) {
return topicName + "_" + clientId;
}
public int size() {
return data.size();
}
public Set<String> keySet() {
return Sets.newHashSet(data.keySet());
}
public void put(TopicSubscriber topicSubscriber) {
if (topicSubscriber == null) { return; }
modifyLock.lock();
try {
this.data.put(topicSubscriber.key(), topicSubscriber);
SerializableStringSet clientIds = this.topicnameIndex.get(topicSubscriber.topicName());
if (clientIds == null) {
clientIds = new SerializableStringSet();
}
clientIds.add(topicSubscriber.clientId());
this.topicnameIndex.put(topicSubscriber.topicName(), clientIds);
SerializableStringSet topicNames = this.clientidIndex.get(topicSubscriber.clientId());
if (topicNames == null) {
topicNames = new SerializableStringSet();
}
topicNames.add(topicSubscriber.topicName());
this.clientidIndex.put(topicSubscriber.clientId(), topicNames);
if (logger.isDebugEnabled()) {
logger.debug("TopicSubscriber added [topicName={}, clientId={}]", topicSubscriber.topicName(),
topicSubscriber.clientId());
logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
topicnameIndex.size(), clientidIndex.size());
}
}
finally {
modifyLock.unlock();
}
}
public TopicSubscriber get(String key) {
return data.get(key);
}
public Set<String> clientIdsOf(String topicName) {
Set<String> ret = topicnameIndex.get(topicName);
return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
}
public Set<String> topicNamesOf(String clientId) {
Set<String> ret = clientidIndex.get(clientId);
return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
}
public void updateByTopicName(String topicName) {
TopicSubscription.NEXUS.topicFilters().stream()
.filter(topicFilter -> TopicMatcher.match(topicFilter, topicName))
.forEach(topicFilter -> TopicSubscription.NEXUS.clientIdsOf(topicFilter)
.forEach(clientId -> TopicSubscriber.NEXUS.put(new TopicSubscriber(clientId, topicName))));
}
public TopicSubscriber removeByKey(String topicName, String clientId) {
return removeByKey(key(topicName, clientId));
}
private TopicSubscriber removeByKey(String key) {
modifyLock.lock();
try {
TopicSubscriber removed = this.data.remove(key);
if (removed == null) { return null; }
SerializableStringSet clientIds = topicnameIndex.get(removed.topicName());
clientIds.remove(removed.clientId());
if (clientIds.size() <= 0) {
topicnameIndex.remove(removed.topicName());
}
else {
topicnameIndex.put(removed.topicName(), clientIds);
}
SerializableStringSet topicNames = clientidIndex.get(removed.clientId());
topicNames.remove(removed.topicName());
if (topicNames.size() <= 0) {
clientidIndex.remove(removed.clientId());
}
else {
clientidIndex.put(removed.clientId(), topicNames);
}
if (logger.isDebugEnabled()) {
logger.debug("TopicSubscriber removed [topicName={}, clientId={}]", removed.topicName(),
removed.clientId());
logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
topicnameIndex.size(), clientidIndex.size());
}
return removed;
}
finally {
modifyLock.unlock();
}
}
public Set<String> removeByClientId(String clientId) {
modifyLock.lock();
try {
SerializableStringSet topicNames = this.clientidIndex.remove(clientId);
if (topicNames == null) { return Sets.newHashSet(); }
topicNames.forEach(topicName -> {
SerializableStringSet clientIds = topicnameIndex.get(topicName);
clientIds.remove(clientId);
if (clientIds.size() <= 0) {
topicnameIndex.remove(topicName);
}
else {
topicnameIndex.put(topicName, clientIds);
}
});
topicNames.stream().map(topicName -> key(topicName, clientId)).forEach(key -> {
TopicSubscriber removed = data.remove(key);
if (logger.isDebugEnabled()) {
logger.debug("TopicSubscriber removed [topicName={}, clientId={}]", removed.topicName(),
removed.clientId());
logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
topicnameIndex.size(), clientidIndex.size());
}
});
return topicNames;
}
finally {
modifyLock.unlock();
}
}
public void removeByTopicFilter(String clientId, String topicFilter) {
modifyLock.lock();
try {
List<String> topicNamesToRemove = this.topicNamesOf(clientId).stream()
.filter(topicName -> TopicMatcher.match(topicFilter, topicName)).collect(Collectors.toList());
Set<String> otherTopicFilters = TopicSubscription.NEXUS.topicFiltersOf(clientId);
otherTopicFilters.remove(topicFilter);
// Remove topicNames from topicNmaesToRemove which matches with
// other topic filters
List<String> tempTopicNames = Lists.newArrayList(topicNamesToRemove);
otherTopicFilters.stream().forEach(filter -> {
tempTopicNames.stream().forEach(name -> {
if (TopicMatcher.match(filter, name)) {
topicNamesToRemove.remove(name);
}
});
});
topicNamesToRemove.stream().map(topicName -> key(topicName, clientId)).forEach(key -> data.remove(key));
SerializableStringSet topicValues = clientidIndex.get(clientId);
if (topicValues != null) {
topicValues.removeAll(topicNamesToRemove);
if (topicValues.size() <= 0) {
clientidIndex.remove(clientId);
}
else {
clientidIndex.put(clientId, topicValues);
}
}
topicNamesToRemove.forEach(topicName -> {
SerializableStringSet clientIds = topicnameIndex.get(topicName);
if (clientIds == null) { return; }
clientIds.remove(clientId);
if (clientIds.size() <= 0) {
topicnameIndex.remove(topicName);
}
else {
topicnameIndex.put(topicName, clientIds);
}
});
if (logger.isDebugEnabled()) {
logger.debug("TopicSubscribers removed [topicFilter={}, clientId={}, topicNameCount={}]", topicFilter,
clientId, topicNamesToRemove.size());
logger.debug("TopicSubscribers Size [data={}, topicnameIndex={}, clientidIndex={}]", data.size(),
topicnameIndex.size(), clientidIndex.size());
}
}
finally {
modifyLock.unlock();
}
}
}