View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.session;
18  
19  import java.util.Set;
20  
21  import com.google.common.base.Strings;
22  import com.google.common.collect.Maps;
23  import com.hazelcast.core.Message;
24  import com.hazelcast.core.MessageListener;
25  
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelId;
28  import net.anyflow.lannister.cluster.ClusterDataFactory;
29  import net.anyflow.lannister.cluster.Map;
30  import net.anyflow.lannister.topic.Notification;
31  
32  public class Sessions implements MessageListener<Notification> {
33  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sessions.class);
34  
35  	private final Map<String, Session> sessions;
36  	private final java.util.Map<ChannelId, String> clientIds; // KEY:channelId
37  	private final java.util.Map<String, ChannelHandlerContext> ctxs; // KEY:clientlId
38  
39  	protected Sessions() {
40  		sessions = ClusterDataFactory.INSTANCE.createMap("sessions");
41  		clientIds = Maps.newHashMap();
42  		ctxs = Maps.newHashMap();
43  	}
44  
45  	public void put(Session session, ChannelHandlerContext ctx) {
46  		synchronized (this) {
47  			session.setConnected(true);
48  			sessions.put(session.clientId(), session); // [MQTT-3.1.2-4]
49  			clientIds.put(ctx.channel().id(), session.clientId());
50  			ctxs.put(session.clientId(), ctx);
51  		}
52  
53  		if (logger.isDebugEnabled()) {
54  			logger.debug("session added [clientId={}, sessionsSize={}, clientIdsSize={}, ctxsSize={}]",
55  					session.clientId(), sessions.size(), clientIds.size(), ctxs.size());
56  		}
57  	}
58  
59  	public void persist(Session session) {
60  		sessions.put(session.clientId(), session);
61  	}
62  
63  	public ChannelHandlerContext channelHandlerContext(String clientId) {
64  		return ctxs.get(clientId);
65  	}
66  
67  	public Session get(ChannelId channelId) {
68  		String clientId = clientIds.get(channelId);
69  
70  		if (Strings.isNullOrEmpty(clientId)) { return null; }
71  
72  		return sessions.get(clientId);
73  	}
74  
75  	public Session get(String clientId) {
76  		return sessions.get(clientId);
77  	}
78  
79  	public java.util.Map<String, ChannelHandlerContext> ctxs() {
80  		return ctxs;
81  	}
82  
83  	protected void remove(Session session) {
84  		if (session == null) { return; }
85  
86  		synchronized (this) {
87  			try {
88  				if (session.cleanSession()) { // [MQTT-3.1.2-5]
89  					sessions.remove(session.clientId());
90  				}
91  
92  				ChannelId channelId = session.channelId();
93  				if (channelId == null) { return; }
94  
95  				clientIds.remove(channelId);
96  				ctxs.remove(session.clientId());
97  			}
98  			finally {
99  				if (logger.isDebugEnabled()) {
100 					logger.debug("session removed [clientId={}, sessionsSize={}, clientIdsSize={}, ctxsSize={}]",
101 							session.clientId(), sessions.size(), clientIds.size(), ctxs.size());
102 				}
103 			}
104 		}
105 	}
106 
107 	public Set<String> keySet() {
108 		return sessions.keySet();
109 	}
110 
111 	@Override
112 	public void onMessage(Message<Notification> message) {
113 		Notification notified = message.getMessageObject();
114 
115 		Session session = get(notified.clientId());
116 		if (session == null || !session.isConnected(true)) { return; }
117 
118 		session.sendPublish(notified.topic(), notified.message());// [MQTT-3.3.1-8],[MQTT-3.3.1-9]
119 	}
120 }