View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package net.anyflow.lannister.message;
17  
18  import java.util.Set;
19  import java.util.concurrent.locks.Lock;
20  
21  import com.google.common.collect.Sets;
22  
23  import net.anyflow.lannister.cluster.ClusterDataFactory;
24  import net.anyflow.lannister.cluster.Map;
25  import net.anyflow.lannister.cluster.SerializableIntegerSet;
26  import net.anyflow.lannister.cluster.SerializableStringSet;
27  
28  public class OutboundMessageStatuses {
29  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundMessageStatuses.class);
30  
31  	private final Map<String, OutboundMessageStatus> data;
32  	private final Map<Integer, SerializableStringSet> messageidIndex;
33  	private final Map<String, SerializableIntegerSet> clientidIndex;
34  
35  	private final Lock modifyLock;
36  
37  	protected OutboundMessageStatuses() {
38  		this.data = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_data");
39  		this.messageidIndex = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_messageidIndex");
40  		this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_clientidIndex");
41  
42  		this.modifyLock = ClusterDataFactory.INSTANCE.createLock("OutboundMessageStatuses_modifyLock");
43  	}
44  
45  	public static String key(Integer messageId, String clientId) {
46  		return clientId + "_" + Integer.toString(messageId);
47  	}
48  
49  	public int size() {
50  		return data.size();
51  	}
52  
53  	public Set<String> keySet() {
54  		return Sets.newHashSet(data.keySet());
55  	}
56  
57  	public void put(OutboundMessageStatus outboundMessageStatus) {
58  		if (outboundMessageStatus == null) { return; }
59  
60  		modifyLock.lock();
61  		try {
62  			this.data.put(outboundMessageStatus.key(), outboundMessageStatus);
63  
64  			SerializableStringSet clientIds = this.messageidIndex.get(outboundMessageStatus.messageId());
65  			if (clientIds == null) {
66  				clientIds = new SerializableStringSet();
67  			}
68  			clientIds.add(outboundMessageStatus.clientId());
69  			this.messageidIndex.put(outboundMessageStatus.messageId(), clientIds);
70  
71  			SerializableIntegerSet messageIds = this.clientidIndex.get(outboundMessageStatus.clientId());
72  			if (messageIds == null) {
73  				messageIds = new SerializableIntegerSet();
74  			}
75  			messageIds.add(outboundMessageStatus.messageId());
76  			this.clientidIndex.put(outboundMessageStatus.clientId(), messageIds);
77  
78  			MessageReferenceCounts.INSTANCE.retain(outboundMessageStatus.messageKey());
79  
80  			if (logger.isDebugEnabled()) {
81  				logger.debug("OutboundMessageStatus added [messageId={}, clientId={}, status=]",
82  						outboundMessageStatus.messageId(), outboundMessageStatus.clientId(),
83  						outboundMessageStatus.status());
84  				logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
85  						messageidIndex.size(), clientidIndex.size());
86  			}
87  		}
88  		finally {
89  			modifyLock.unlock();
90  		}
91  	}
92  
93  	public OutboundMessageStatus get(String key) {
94  		return data.get(key);
95  	}
96  
97  	public OutboundMessageStatus getBy(Integer messageId, String clientId) {
98  		return data.get(key(messageId, clientId));
99  	}
100 
101 	public Set<Integer> messageIdsOf(String clientId) {
102 		Set<Integer> ret = clientidIndex.get(clientId);
103 
104 		return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
105 	}
106 
107 	public OutboundMessageStatus removeByKey(Integer messageId, String clientId) {
108 		return removeByKey(key(messageId, clientId));
109 	}
110 
111 	private OutboundMessageStatus removeByKey(String key) {
112 		modifyLock.lock();
113 
114 		try {
115 			OutboundMessageStatus removed = this.data.remove(key);
116 			if (removed == null) { return null; }
117 
118 			SerializableStringSet clientIds = messageidIndex.get(removed.messageId());
119 			clientIds.remove(removed.clientId());
120 			if (clientIds.size() <= 0) {
121 				messageidIndex.remove(removed.messageId());
122 			}
123 			else {
124 				messageidIndex.put(removed.messageId(), clientIds);
125 			}
126 
127 			SerializableIntegerSet messageIds = clientidIndex.get(removed.clientId());
128 			messageIds.remove(removed.messageId());
129 			if (messageIds.size() <= 0) {
130 				clientidIndex.remove(removed.clientId());
131 			}
132 			else {
133 				clientidIndex.put(removed.clientId(), messageIds);
134 			}
135 
136 			MessageReferenceCounts.INSTANCE.release(removed.messageKey());
137 
138 			if (logger.isDebugEnabled()) {
139 				logger.debug("OutboundMessageStatus removed [messageId={}, clientId={}, status=]", removed.messageId(),
140 						removed.clientId(), removed.status());
141 				logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
142 						messageidIndex.size(), clientidIndex.size());
143 			}
144 
145 			return removed;
146 		}
147 		finally {
148 			modifyLock.unlock();
149 		}
150 	}
151 
152 	public Set<Integer> removeByClientId(String clientId) {
153 		modifyLock.lock();
154 
155 		try {
156 			SerializableIntegerSet messageIds = this.clientidIndex.remove(clientId);
157 			if (messageIds == null) { return Sets.newHashSet(); }
158 
159 			messageIds.forEach(messageId -> {
160 				SerializableStringSet clientIds = messageidIndex.get(messageId);
161 				clientIds.remove(clientId);
162 				if (clientIds.size() <= 0) {
163 					messageidIndex.remove(messageId);
164 				}
165 				else {
166 					messageidIndex.put(messageId, clientIds);
167 				}
168 			});
169 			messageIds.stream().map(messageId -> key(messageId, clientId)).forEach(key -> {
170 				OutboundMessageStatus removed = data.remove(key);
171 
172 				MessageReferenceCounts.INSTANCE.release(removed.messageKey());
173 
174 				if (logger.isDebugEnabled()) {
175 					logger.debug("OutboundMessageStatus removed [messageId={}, clientId={}, status=]",
176 							removed.messageId(), removed.clientId(), removed.status());
177 					logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]",
178 							data.size(), messageidIndex.size(), clientidIndex.size());
179 				}
180 			});
181 
182 			return messageIds;
183 		}
184 		finally {
185 			modifyLock.unlock();
186 		}
187 	}
188 
189 	public boolean containsKey(Integer messageId, String clientId) {
190 		return data.containsKey(key(messageId, clientId));
191 	}
192 
193 	public void update(Integer messageId, String clientId, OutboundMessageStatus.Status targetStatus) {
194 		String key = key(messageId, clientId);
195 
196 		OutboundMessageStatus status = data.get(key);
197 		if (status == null) { return; }
198 
199 		status.status(targetStatus);
200 
201 		data.put(key, status);
202 
203 		if (logger.isDebugEnabled()) {
204 			logger.debug("OutboundMessageStatus updated [messageId={}, clientId={}, status=]", status.messageId(),
205 					status.clientId(), status.status());
206 		}
207 	}
208 }