1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.session;
18
19 import java.util.Set;
20
21 import com.google.common.base.Strings;
22 import com.google.common.collect.Maps;
23 import com.hazelcast.core.Message;
24 import com.hazelcast.core.MessageListener;
25
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelId;
28 import net.anyflow.lannister.cluster.ClusterDataFactory;
29 import net.anyflow.lannister.cluster.Map;
30 import net.anyflow.lannister.topic.Notification;
31
32 public class Sessions implements MessageListener<Notification> {
33 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sessions.class);
34
35 private final Map<String, Session> sessions;
36 private final java.util.Map<ChannelId, String> clientIds;
37 private final java.util.Map<String, ChannelHandlerContext> ctxs;
38
39 protected Sessions() {
40 sessions = ClusterDataFactory.INSTANCE.createMap("sessions");
41 clientIds = Maps.newHashMap();
42 ctxs = Maps.newHashMap();
43 }
44
45 public void put(Session session, ChannelHandlerContext ctx) {
46 synchronized (this) {
47 session.setConnected(true);
48 sessions.put(session.clientId(), session);
49 clientIds.put(ctx.channel().id(), session.clientId());
50 ctxs.put(session.clientId(), ctx);
51 }
52
53 if (logger.isDebugEnabled()) {
54 logger.debug("session added [clientId={}, sessionsSize={}, clientIdsSize={}, ctxsSize={}]",
55 session.clientId(), sessions.size(), clientIds.size(), ctxs.size());
56 }
57 }
58
59 public void persist(Session session) {
60 sessions.put(session.clientId(), session);
61 }
62
63 public ChannelHandlerContext channelHandlerContext(String clientId) {
64 return ctxs.get(clientId);
65 }
66
67 public Session get(ChannelId channelId) {
68 String clientId = clientIds.get(channelId);
69
70 if (Strings.isNullOrEmpty(clientId)) { return null; }
71
72 return sessions.get(clientId);
73 }
74
75 public Session get(String clientId) {
76 return sessions.get(clientId);
77 }
78
79 public java.util.Map<String, ChannelHandlerContext> ctxs() {
80 return ctxs;
81 }
82
83 protected void remove(Session session) {
84 if (session == null) { return; }
85
86 synchronized (this) {
87 try {
88 if (session.cleanSession()) {
89 sessions.remove(session.clientId());
90 }
91
92 ChannelId channelId = session.channelId();
93 if (channelId == null) { return; }
94
95 clientIds.remove(channelId);
96 ctxs.remove(session.clientId());
97 }
98 finally {
99 if (logger.isDebugEnabled()) {
100 logger.debug("session removed [clientId={}, sessionsSize={}, clientIdsSize={}, ctxsSize={}]",
101 session.clientId(), sessions.size(), clientIds.size(), ctxs.size());
102 }
103 }
104 }
105 }
106
107 public Set<String> keySet() {
108 return sessions.keySet();
109 }
110
111 @Override
112 public void onMessage(Message<Notification> message) {
113 Notification notified = message.getMessageObject();
114
115 Session session = get(notified.clientId());
116 if (session == null || !session.isConnected(true)) { return; }
117
118 session.sendPublish(notified.topic(), notified.message());
119 }
120 }