1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package net.anyflow.lannister.message;
17
18 import java.util.Set;
19
20 import com.google.common.collect.Sets;
21
22 import net.anyflow.lannister.cluster.ClusterDataFactory;
23 import net.anyflow.lannister.cluster.Map;
24 import net.anyflow.lannister.message.InboundMessageStatus.Status;
25
26 public class Messages {
27 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Messages.class);
28
29 private final Map<String, Message> data;
30
31 protected Messages() {
32 data = ClusterDataFactory.INSTANCE.createMap("Messages_data");
33 }
34
35 public static String key(String publisherId, Integer messageId) {
36 return publisherId + "_" + Integer.toString(messageId);
37 }
38
39 public int size() {
40 return data.size();
41 }
42
43 public void dispose() {
44 data.dispose();
45 }
46
47 public Message remove(String key) {
48 Message ret = data.remove(key);
49 if (ret == null) { return null; }
50
51 if (logger.isDebugEnabled()) {
52 logger.debug("REMOVEed Message [messageId={}, publisherId={}, topicName=]", ret.id(), ret.publisherId(),
53 ret.topicName());
54 logger.debug("Messages size={}", data.size());
55 }
56
57 return ret;
58 }
59
60 public Set<String> keySet() {
61 return Sets.newHashSet(data.keySet());
62 }
63
64 public Message get(String key) {
65 return data.get(key);
66 }
67
68 public void put(String key, Message message) {
69 data.put(key, message);
70
71 InboundMessageStatus.NEXUS.put(new InboundMessageStatus(message.key(), message.publisherId(), message.id(),
72 message.topicName(), Status.RECEIVED));
73
74 if (logger.isDebugEnabled()) {
75 logger.debug("ADDed Message [messageId={}, publisherId={}, topicName=]", message.id(),
76 message.publisherId(), message.topicName());
77 logger.debug("Messages size={}", data.size());
78 }
79 }
80 }