View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package net.anyflow.lannister.message;
17  
18  import java.util.Set;
19  import java.util.concurrent.locks.Lock;
20  
21  import com.google.common.collect.Sets;
22  
23  import net.anyflow.lannister.cluster.ClusterDataFactory;
24  import net.anyflow.lannister.cluster.Map;
25  import net.anyflow.lannister.cluster.SerializableIntegerSet;
26  import net.anyflow.lannister.cluster.SerializableStringSet;
27  
28  public class InboundMessageStatuses {
29  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InboundMessageStatuses.class);
30  
31  	private final Map<String, InboundMessageStatus> data;
32  	private final Map<Integer, SerializableStringSet> messageidIndex;
33  	private final Map<String, SerializableIntegerSet> clientidIndex;
34  
35  	private final Lock modifyLock;
36  
37  	protected InboundMessageStatuses() {
38  		this.data = ClusterDataFactory.INSTANCE.createMap("InboundMessageStatuses_data");
39  		this.messageidIndex = ClusterDataFactory.INSTANCE.createMap("InboundMessageStatuses_messageidIndex");
40  		this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("InboundMessageStatuses_clientidIndex");
41  
42  		this.modifyLock = ClusterDataFactory.INSTANCE.createLock("InboundMessageStatuses_modifyLock");
43  	}
44  
45  	public static String key(Integer messageId, String clientId) {
46  		return clientId + "_" + Integer.toString(messageId);
47  	}
48  
49  	public int size() {
50  		return data.size();
51  	}
52  
53  	public Set<String> keySet() {
54  		return Sets.newHashSet(data.keySet());
55  	}
56  
57  	public void put(InboundMessageStatus inboundMessageStatus) {
58  		if (inboundMessageStatus == null) { return; }
59  
60  		modifyLock.lock();
61  		try {
62  			this.data.put(inboundMessageStatus.key(), inboundMessageStatus);
63  
64  			SerializableStringSet clientIds = this.messageidIndex.get(inboundMessageStatus.messageId());
65  			if (clientIds == null) {
66  				clientIds = new SerializableStringSet();
67  			}
68  			clientIds.add(inboundMessageStatus.clientId());
69  			this.messageidIndex.put(inboundMessageStatus.messageId(), clientIds);
70  
71  			SerializableIntegerSet messageIds = this.clientidIndex.get(inboundMessageStatus.clientId());
72  			if (messageIds == null) {
73  				messageIds = new SerializableIntegerSet();
74  			}
75  			messageIds.add(inboundMessageStatus.messageId());
76  			this.clientidIndex.put(inboundMessageStatus.clientId(), messageIds);
77  
78  			MessageReferenceCounts.INSTANCE.retain(inboundMessageStatus.messageKey());
79  
80  			if (logger.isDebugEnabled()) {
81  				logger.debug("InboundMessageStatus removed [messageId={}, clientId={}, status=]",
82  						inboundMessageStatus.messageId(), inboundMessageStatus.clientId(),
83  						inboundMessageStatus.status());
84  				logger.debug("InboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
85  						messageidIndex.size(), clientidIndex.size());
86  			}
87  		}
88  		finally {
89  			modifyLock.unlock();
90  		}
91  	}
92  
93  	public InboundMessageStatus getBy(Integer messageId, String clientId) {
94  		return get(key(messageId, clientId));
95  	}
96  
97  	public InboundMessageStatus get(String key) {
98  		return data.get(key);
99  	}
100 
101 	public InboundMessageStatus removeByKey(Integer messageId, String clientId) {
102 		return removeByKey(key(messageId, clientId));
103 	}
104 
105 	private InboundMessageStatus removeByKey(String key) {
106 		modifyLock.lock();
107 
108 		try {
109 			InboundMessageStatus removed = this.data.remove(key);
110 			if (removed == null) { return null; }
111 
112 			SerializableStringSet clientIds = messageidIndex.get(removed.messageId());
113 			clientIds.remove(removed.clientId());
114 			if (clientIds.size() <= 0) {
115 				messageidIndex.remove(removed.messageId());
116 			}
117 			else {
118 				messageidIndex.put(removed.messageId(), clientIds);
119 			}
120 
121 			SerializableIntegerSet messageIds = clientidIndex.get(removed.clientId());
122 			messageIds.remove(removed.messageId());
123 			if (messageIds.size() <= 0) {
124 				clientidIndex.remove(removed.clientId());
125 			}
126 			else {
127 				clientidIndex.put(removed.clientId(), messageIds);
128 			}
129 
130 			MessageReferenceCounts.INSTANCE.release(removed.messageKey());
131 
132 			if (logger.isDebugEnabled()) {
133 				logger.debug("InboundMessageStatus removed [messageId={}, clientId={}, status=]", removed.messageId(),
134 						removed.clientId(), removed.status());
135 				logger.debug("InboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
136 						messageidIndex.size(), clientidIndex.size());
137 			}
138 
139 			return removed;
140 		}
141 		finally {
142 			modifyLock.unlock();
143 		}
144 	}
145 
146 	public void update(Integer messageId, String clientId, InboundMessageStatus.Status targetStatus) {
147 		String key = key(messageId, clientId);
148 
149 		InboundMessageStatus status = data.get(key);
150 		if (status == null) { return; }
151 
152 		status.status(targetStatus);
153 
154 		data.put(key, status);
155 
156 		if (logger.isDebugEnabled()) {
157 			logger.debug("InboundMessageStatus removed [messageId={}, clientId={}, status=]", status.messageId(),
158 					status.clientId(), status.status());
159 		}
160 	}
161 }