1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.anyflow.lannister.message;
17
18 import java.util.Set;
19 import java.util.concurrent.locks.Lock;
20
21 import com.google.common.collect.Sets;
22
23 import net.anyflow.lannister.cluster.ClusterDataFactory;
24 import net.anyflow.lannister.cluster.Map;
25 import net.anyflow.lannister.cluster.SerializableIntegerSet;
26 import net.anyflow.lannister.cluster.SerializableStringSet;
27
28 public class InboundMessageStatuses {
29 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InboundMessageStatuses.class);
30
31 private final Map<String, InboundMessageStatus> data;
32 private final Map<Integer, SerializableStringSet> messageidIndex;
33 private final Map<String, SerializableIntegerSet> clientidIndex;
34
35 private final Lock modifyLock;
36
37 protected InboundMessageStatuses() {
38 this.data = ClusterDataFactory.INSTANCE.createMap("InboundMessageStatuses_data");
39 this.messageidIndex = ClusterDataFactory.INSTANCE.createMap("InboundMessageStatuses_messageidIndex");
40 this.clientidIndex = ClusterDataFactory.INSTANCE.createMap("InboundMessageStatuses_clientidIndex");
41
42 this.modifyLock = ClusterDataFactory.INSTANCE.createLock("InboundMessageStatuses_modifyLock");
43 }
44
45 public static String key(Integer messageId, String clientId) {
46 return clientId + "_" + Integer.toString(messageId);
47 }
48
49 public int size() {
50 return data.size();
51 }
52
53 public Set<String> keySet() {
54 return Sets.newHashSet(data.keySet());
55 }
56
57 public void put(InboundMessageStatus inboundMessageStatus) {
58 if (inboundMessageStatus == null) { return; }
59
60 modifyLock.lock();
61 try {
62 this.data.put(inboundMessageStatus.key(), inboundMessageStatus);
63
64 SerializableStringSet clientIds = this.messageidIndex.get(inboundMessageStatus.messageId());
65 if (clientIds == null) {
66 clientIds = new SerializableStringSet();
67 }
68 clientIds.add(inboundMessageStatus.clientId());
69 this.messageidIndex.put(inboundMessageStatus.messageId(), clientIds);
70
71 SerializableIntegerSet messageIds = this.clientidIndex.get(inboundMessageStatus.clientId());
72 if (messageIds == null) {
73 messageIds = new SerializableIntegerSet();
74 }
75 messageIds.add(inboundMessageStatus.messageId());
76 this.clientidIndex.put(inboundMessageStatus.clientId(), messageIds);
77
78 MessageReferenceCounts.INSTANCE.retain(inboundMessageStatus.messageKey());
79
80 if (logger.isDebugEnabled()) {
81 logger.debug("InboundMessageStatus removed [messageId={}, clientId={}, status=]",
82 inboundMessageStatus.messageId(), inboundMessageStatus.clientId(),
83 inboundMessageStatus.status());
84 logger.debug("InboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
85 messageidIndex.size(), clientidIndex.size());
86 }
87 }
88 finally {
89 modifyLock.unlock();
90 }
91 }
92
93 public InboundMessageStatus getBy(Integer messageId, String clientId) {
94 return get(key(messageId, clientId));
95 }
96
97 public InboundMessageStatus get(String key) {
98 return data.get(key);
99 }
100
101 public InboundMessageStatus removeByKey(Integer messageId, String clientId) {
102 return removeByKey(key(messageId, clientId));
103 }
104
105 private InboundMessageStatus removeByKey(String key) {
106 modifyLock.lock();
107
108 try {
109 InboundMessageStatus removed = this.data.remove(key);
110 if (removed == null) { return null; }
111
112 SerializableStringSet clientIds = messageidIndex.get(removed.messageId());
113 clientIds.remove(removed.clientId());
114 if (clientIds.size() <= 0) {
115 messageidIndex.remove(removed.messageId());
116 }
117 else {
118 messageidIndex.put(removed.messageId(), clientIds);
119 }
120
121 SerializableIntegerSet messageIds = clientidIndex.get(removed.clientId());
122 messageIds.remove(removed.messageId());
123 if (messageIds.size() <= 0) {
124 clientidIndex.remove(removed.clientId());
125 }
126 else {
127 clientidIndex.put(removed.clientId(), messageIds);
128 }
129
130 MessageReferenceCounts.INSTANCE.release(removed.messageKey());
131
132 if (logger.isDebugEnabled()) {
133 logger.debug("InboundMessageStatus removed [messageId={}, clientId={}, status=]", removed.messageId(),
134 removed.clientId(), removed.status());
135 logger.debug("InboundMessageStatuses Size [data={}, messageidIndex={}, clientidIndex={}]", data.size(),
136 messageidIndex.size(), clientidIndex.size());
137 }
138
139 return removed;
140 }
141 finally {
142 modifyLock.unlock();
143 }
144 }
145
146 public void update(Integer messageId, String clientId, InboundMessageStatus.Status targetStatus) {
147 String key = key(messageId, clientId);
148
149 InboundMessageStatus status = data.get(key);
150 if (status == null) { return; }
151
152 status.status(targetStatus);
153
154 data.put(key, status);
155
156 if (logger.isDebugEnabled()) {
157 logger.debug("InboundMessageStatus removed [messageId={}, clientId={}, status=]", status.messageId(),
158 status.clientId(), status.status());
159 }
160 }
161 }