OutboundMessageStatuses.java

/*
 * Copyright 2016 The Lannister Project
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package net.anyflow.lannister.message;

import java.util.Set;
import java.util.concurrent.locks.Lock;

import com.google.common.collect.Sets;

import net.anyflow.lannister.cluster.ClusterDataFactory;
import net.anyflow.lannister.cluster.Map;
import net.anyflow.lannister.cluster.SerializableIntegerSet;
import net.anyflow.lannister.cluster.SerializableStringSet;

public class OutboundMessageStatuses {
	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundMessageStatuses.class);

	private final Map<String, OutboundMessageStatus> data;
	private final Map<Integer, SerializableStringSet> messageidIndex;
	private final Map<String, SerializableIntegerSet> clientidIndex;

	private final Lock modifyLock;

	protected OutboundMessageStatuses() {
		this.data = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_data");
		this.messageidIndex = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_messageidIndex");
		this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_clientidIndex");

		this.modifyLock = ClusterDataFactory.INSTANCE.createLock("OutboundMessageStatuses_modifyLock");
	}

	public static String key(Integer messageId, String clientId) {
		return clientId + "_" + Integer.toString(messageId);
	}

	public int size() {
		return data.size();
	}

	public Set<String> keySet() {
		return Sets.newHashSet(data.keySet());
	}

	public void put(OutboundMessageStatus outboundMessageStatus) {
		if (outboundMessageStatus == null) { return; }

		modifyLock.lock();
		try {
			this.data.put(outboundMessageStatus.key(), outboundMessageStatus);

			SerializableStringSet clientIds = this.messageidIndex.get(outboundMessageStatus.messageId());
			if (clientIds == null) {
				clientIds = new SerializableStringSet();
			}
			clientIds.add(outboundMessageStatus.clientId());
			this.messageidIndex.put(outboundMessageStatus.messageId(), clientIds);

			SerializableIntegerSet messageIds = this.clientidIndex.get(outboundMessageStatus.clientId());
			if (messageIds == null) {
				messageIds = new SerializableIntegerSet();
			}
			messageIds.add(outboundMessageStatus.messageId());
			this.clientidIndex.put(outboundMessageStatus.clientId(), messageIds);

			MessageReferenceCounts.INSTANCE.retain(outboundMessageStatus.messageKey());

			if (logger.isDebugEnabled()) {
				logger.debug("OutboundMessageStatus added [messageId={}, clientId={}, status=]",
						outboundMessageStatus.messageId(), outboundMessageStatus.clientId(),
						outboundMessageStatus.status());
				logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
						messageidIndex.size(), clientidIndex.size());
			}
		}
		finally {
			modifyLock.unlock();
		}
	}

	public OutboundMessageStatus get(String key) {
		return data.get(key);
	}

	public OutboundMessageStatus getBy(Integer messageId, String clientId) {
		return data.get(key(messageId, clientId));
	}

	public Set<Integer> messageIdsOf(String clientId) {
		Set<Integer> ret = clientidIndex.get(clientId);

		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
	}

	public OutboundMessageStatus removeByKey(Integer messageId, String clientId) {
		return removeByKey(key(messageId, clientId));
	}

	private OutboundMessageStatus removeByKey(String key) {
		modifyLock.lock();

		try {
			OutboundMessageStatus removed = this.data.remove(key);
			if (removed == null) { return null; }

			SerializableStringSet clientIds = messageidIndex.get(removed.messageId());
			clientIds.remove(removed.clientId());
			if (clientIds.size() <= 0) {
				messageidIndex.remove(removed.messageId());
			}
			else {
				messageidIndex.put(removed.messageId(), clientIds);
			}

			SerializableIntegerSet messageIds = clientidIndex.get(removed.clientId());
			messageIds.remove(removed.messageId());
			if (messageIds.size() <= 0) {
				clientidIndex.remove(removed.clientId());
			}
			else {
				clientidIndex.put(removed.clientId(), messageIds);
			}

			MessageReferenceCounts.INSTANCE.release(removed.messageKey());

			if (logger.isDebugEnabled()) {
				logger.debug("OutboundMessageStatus removed [messageId={}, clientId={}, status=]", removed.messageId(),
						removed.clientId(), removed.status());
				logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
						messageidIndex.size(), clientidIndex.size());
			}

			return removed;
		}
		finally {
			modifyLock.unlock();
		}
	}

	public Set<Integer> removeByClientId(String clientId) {
		modifyLock.lock();

		try {
			SerializableIntegerSet messageIds = this.clientidIndex.remove(clientId);
			if (messageIds == null) { return Sets.newHashSet(); }

			messageIds.forEach(messageId -> {
				SerializableStringSet clientIds = messageidIndex.get(messageId);
				clientIds.remove(clientId);
				if (clientIds.size() <= 0) {
					messageidIndex.remove(messageId);
				}
				else {
					messageidIndex.put(messageId, clientIds);
				}
			});
			messageIds.stream().map(messageId -> key(messageId, clientId)).forEach(key -> {
				OutboundMessageStatus removed = data.remove(key);

				MessageReferenceCounts.INSTANCE.release(removed.messageKey());

				if (logger.isDebugEnabled()) {
					logger.debug("OutboundMessageStatus removed [messageId={}, clientId={}, status=]",
							removed.messageId(), removed.clientId(), removed.status());
					logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]",
							data.size(), messageidIndex.size(), clientidIndex.size());
				}
			});

			return messageIds;
		}
		finally {
			modifyLock.unlock();
		}
	}

	public boolean containsKey(Integer messageId, String clientId) {
		return data.containsKey(key(messageId, clientId));
	}

	public void update(Integer messageId, String clientId, OutboundMessageStatus.Status targetStatus) {
		String key = key(messageId, clientId);

		OutboundMessageStatus status = data.get(key);
		if (status == null) { return; }

		status.status(targetStatus);

		data.put(key, status);

		if (logger.isDebugEnabled()) {
			logger.debug("OutboundMessageStatus updated [messageId={}, clientId={}, status=]", status.messageId(),
					status.clientId(), status.status());
		}
	}
}