1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.anyflow.lannister.message;
17
18 import java.util.Set;
19 import java.util.concurrent.locks.Lock;
20
21 import com.google.common.collect.Sets;
22
23 import net.anyflow.lannister.cluster.ClusterDataFactory;
24 import net.anyflow.lannister.cluster.Map;
25 import net.anyflow.lannister.cluster.SerializableIntegerSet;
26 import net.anyflow.lannister.cluster.SerializableStringSet;
27
28 public class OutboundMessageStatuses {
29 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutboundMessageStatuses.class);
30
31 private final Map<String, OutboundMessageStatus> data;
32 private final Map<Integer, SerializableStringSet> messageidIndex;
33 private final Map<String, SerializableIntegerSet> clientidIndex;
34
35 private final Lock modifyLock;
36
37 protected OutboundMessageStatuses() {
38 this.data = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_data");
39 this.messageidIndex = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_messageidIndex");
40 this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("OutboundMessageStatuses_clientidIndex");
41
42 this.modifyLock = ClusterDataFactory.INSTANCE.createLock("OutboundMessageStatuses_modifyLock");
43 }
44
45 public static String key(Integer messageId, String clientId) {
46 return clientId + "_" + Integer.toString(messageId);
47 }
48
49 public int size() {
50 return data.size();
51 }
52
53 public Set<String> keySet() {
54 return Sets.newHashSet(data.keySet());
55 }
56
57 public void put(OutboundMessageStatus outboundMessageStatus) {
58 if (outboundMessageStatus == null) { return; }
59
60 modifyLock.lock();
61 try {
62 this.data.put(outboundMessageStatus.key(), outboundMessageStatus);
63
64 SerializableStringSet clientIds = this.messageidIndex.get(outboundMessageStatus.messageId());
65 if (clientIds == null) {
66 clientIds = new SerializableStringSet();
67 }
68 clientIds.add(outboundMessageStatus.clientId());
69 this.messageidIndex.put(outboundMessageStatus.messageId(), clientIds);
70
71 SerializableIntegerSet messageIds = this.clientidIndex.get(outboundMessageStatus.clientId());
72 if (messageIds == null) {
73 messageIds = new SerializableIntegerSet();
74 }
75 messageIds.add(outboundMessageStatus.messageId());
76 this.clientidIndex.put(outboundMessageStatus.clientId(), messageIds);
77
78 MessageReferenceCounts.INSTANCE.retain(outboundMessageStatus.messageKey());
79
80 if (logger.isDebugEnabled()) {
81 logger.debug("OutboundMessageStatus added [messageId={}, clientId={}, status=]",
82 outboundMessageStatus.messageId(), outboundMessageStatus.clientId(),
83 outboundMessageStatus.status());
84 logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
85 messageidIndex.size(), clientidIndex.size());
86 }
87 }
88 finally {
89 modifyLock.unlock();
90 }
91 }
92
93 public OutboundMessageStatus get(String key) {
94 return data.get(key);
95 }
96
97 public OutboundMessageStatus getBy(Integer messageId, String clientId) {
98 return data.get(key(messageId, clientId));
99 }
100
101 public Set<Integer> messageIdsOf(String clientId) {
102 Set<Integer> ret = clientidIndex.get(clientId);
103
104 return ret == null ? Sets.newHashSet() : Sets.newHashSet(ret);
105 }
106
107 public OutboundMessageStatus removeByKey(Integer messageId, String clientId) {
108 return removeByKey(key(messageId, clientId));
109 }
110
111 private OutboundMessageStatus removeByKey(String key) {
112 modifyLock.lock();
113
114 try {
115 OutboundMessageStatus removed = this.data.remove(key);
116 if (removed == null) { return null; }
117
118 SerializableStringSet clientIds = messageidIndex.get(removed.messageId());
119 clientIds.remove(removed.clientId());
120 if (clientIds.size() <= 0) {
121 messageidIndex.remove(removed.messageId());
122 }
123 else {
124 messageidIndex.put(removed.messageId(), clientIds);
125 }
126
127 SerializableIntegerSet messageIds = clientidIndex.get(removed.clientId());
128 messageIds.remove(removed.messageId());
129 if (messageIds.size() <= 0) {
130 clientidIndex.remove(removed.clientId());
131 }
132 else {
133 clientidIndex.put(removed.clientId(), messageIds);
134 }
135
136 MessageReferenceCounts.INSTANCE.release(removed.messageKey());
137
138 if (logger.isDebugEnabled()) {
139 logger.debug("OutboundMessageStatus removed [messageId={}, clientId={}, status=]", removed.messageId(),
140 removed.clientId(), removed.status());
141 logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
142 messageidIndex.size(), clientidIndex.size());
143 }
144
145 return removed;
146 }
147 finally {
148 modifyLock.unlock();
149 }
150 }
151
152 public Set<Integer> removeByClientId(String clientId) {
153 modifyLock.lock();
154
155 try {
156 SerializableIntegerSet messageIds = this.clientidIndex.remove(clientId);
157 if (messageIds == null) { return Sets.newHashSet(); }
158
159 messageIds.forEach(messageId -> {
160 SerializableStringSet clientIds = messageidIndex.get(messageId);
161 clientIds.remove(clientId);
162 if (clientIds.size() <= 0) {
163 messageidIndex.remove(messageId);
164 }
165 else {
166 messageidIndex.put(messageId, clientIds);
167 }
168 });
169 messageIds.stream().map(messageId -> key(messageId, clientId)).forEach(key -> {
170 OutboundMessageStatus removed = data.remove(key);
171
172 MessageReferenceCounts.INSTANCE.release(removed.messageKey());
173
174 if (logger.isDebugEnabled()) {
175 logger.debug("OutboundMessageStatus removed [messageId={}, clientId={}, status=]",
176 removed.messageId(), removed.clientId(), removed.status());
177 logger.debug("OutboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]",
178 data.size(), messageidIndex.size(), clientidIndex.size());
179 }
180 });
181
182 return messageIds;
183 }
184 finally {
185 modifyLock.unlock();
186 }
187 }
188
189 public boolean containsKey(Integer messageId, String clientId) {
190 return data.containsKey(key(messageId, clientId));
191 }
192
193 public void update(Integer messageId, String clientId, OutboundMessageStatus.Status targetStatus) {
194 String key = key(messageId, clientId);
195
196 OutboundMessageStatus status = data.get(key);
197 if (status == null) { return; }
198
199 status.status(targetStatus);
200
201 data.put(key, status);
202
203 if (logger.isDebugEnabled()) {
204 logger.debug("OutboundMessageStatus updated [messageId={}, clientId={}, status=]", status.messageId(),
205 status.clientId(), status.status());
206 }
207 }
208 }