TopicSubscriptions.java

/*
 * Copyright 2016 The Lannister Project
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package net.anyflow.lannister.topic;

import java.util.Set;
import java.util.concurrent.locks.Lock;

import com.google.common.collect.Sets;

import net.anyflow.lannister.cluster.ClusterDataFactory;
import net.anyflow.lannister.cluster.Map;
import net.anyflow.lannister.cluster.SerializableStringSet;

public class TopicSubscriptions {
	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopicSubscriptions.class);

	private final Map<String, TopicSubscription> data;
	private final Map<String, SerializableStringSet> topicfilterIndex;
	private final Map<String, SerializableStringSet> clientidIndex;

	private final Lock modifyLock;

	protected TopicSubscriptions() {
		this.data = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_data");
		this.topicfilterIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_topicfilterIndex");
		this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("TopicSubscriptions_clientidIndex");

		this.modifyLock = ClusterDataFactory.INSTANCE.createLock("TopicSubscriptions_modifyLock");
	}

	public static String key(String topicFilter, String clientId) {
		return topicFilter + "_" + clientId;
	}

	public int size() {
		return data.size();
	}

	public Set<String> keySet() {
		return Sets.newHashSet(data.keySet());
	}

	public void put(TopicSubscription topicSubscription) {
		if (topicSubscription == null) { return; }

		modifyLock.lock();
		try {
			this.data.put(topicSubscription.key(), topicSubscription);

			SerializableStringSet clientIds = this.topicfilterIndex.get(topicSubscription.topicFilter());
			if (clientIds == null) {
				clientIds = new SerializableStringSet();
			}
			clientIds.add(topicSubscription.clientId());
			this.topicfilterIndex.put(topicSubscription.topicFilter(), clientIds);

			SerializableStringSet topicFilters = this.clientidIndex.get(topicSubscription.clientId());
			if (topicFilters == null) {
				topicFilters = new SerializableStringSet();
			}
			topicFilters.add(topicSubscription.topicFilter());
			this.clientidIndex.put(topicSubscription.clientId(), topicFilters);

			Topic.NEXUS.keySet().stream()
					.filter(topicName -> TopicMatcher.match(topicSubscription.topicFilter(), topicName))
					.forEach(topicName -> TopicSubscriber.NEXUS
							.put(new TopicSubscriber(topicSubscription.clientId(), topicName)));

			if (logger.isDebugEnabled()) {
				logger.debug("TopicSubscription added [topicFilter={}, clientId={}, qos={}]",
						topicSubscription.topicFilter(), topicSubscription.clientId(), topicSubscription.qos());
				logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]", data.size(),
						topicfilterIndex.size(), clientidIndex.size());
			}
		}
		finally {
			modifyLock.unlock();
		}
	}

	public Set<String> topicFilters() {
		return Sets.newHashSet(topicfilterIndex.keySet());
	}

	public TopicSubscription get(String key) {
		return data.get(key);
	}

	public TopicSubscription getBy(String topicFilter, String clientId) {
		return get(key(topicFilter, clientId));
	}

	public Set<String> clientIdsOf(String topicFilter) {
		Set<String> ret = topicfilterIndex.get(topicFilter);

		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
	}

	public Set<String> topicFiltersOf(String clientId) {
		Set<String> ret = clientidIndex.get(clientId);

		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
	}

	public TopicSubscription removeByKey(String topicFilter, String clientId) {
		return removeByKey(key(topicFilter, clientId));
	}

	private TopicSubscription removeByKey(String key) {
		modifyLock.lock();

		try {
			TopicSubscription removed = this.data.remove(key);
			if (removed == null) { return null; }

			SerializableStringSet clientIds = topicfilterIndex.get(removed.topicFilter());
			clientIds.remove(removed.clientId());

			if (clientIds.size() <= 0) {
				topicfilterIndex.remove(removed.topicFilter());
			}
			else {
				topicfilterIndex.put(removed.topicFilter(), clientIds);
			}

			SerializableStringSet topicFilters = clientidIndex.get(removed.clientId());
			topicFilters.remove(removed.topicFilter());

			if (topicFilters.size() <= 0) {
				clientidIndex.remove(removed.clientId());
			}
			else {
				clientidIndex.put(removed.clientId(), topicFilters);
			}

			if (logger.isDebugEnabled()) {
				logger.debug("TopicSubscription removed [topicFilter={}, clientId={}, qos={}]", removed.topicFilter(),
						removed.clientId(), removed.qos());
				logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]", data.size(),
						topicfilterIndex.size(), clientidIndex.size());
			}
			return removed;
		}
		finally {
			modifyLock.unlock();
		}
	}

	public Set<String> removeByClientId(String clientId) {
		modifyLock.lock();

		try {
			SerializableStringSet topicFilters = this.clientidIndex.remove(clientId);
			if (topicFilters == null) { return Sets.newHashSet(); }

			topicFilters.forEach(topicFilter -> {
				SerializableStringSet target = topicfilterIndex.get(topicFilter);
				target.remove(clientId);

				if (target.size() <= 0) {
					topicfilterIndex.remove(topicFilter);
				}
				else {
					topicfilterIndex.put(topicFilter, target);
				}
			});

			topicFilters.stream().map(topicFilter -> key(topicFilter, clientId)).forEach(key -> {
				TopicSubscription removed = data.remove(key);

				if (logger.isDebugEnabled()) {
					logger.debug("TopicSubscription removed [topicFilter={}, clientId={}, qos={}]",
							removed.topicFilter(), removed.clientId(), removed.qos());
					logger.debug("TopicSubscriptions Size [data={}, topicfilterIndex={}, clientidIndex={}]",
							data.size(), topicfilterIndex.size(), clientidIndex.size());
				}
			});

			return topicFilters;
		}
		finally {
			modifyLock.unlock();
		}
	}
}