View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.session;
18  
19  import java.io.IOException;
20  import java.util.Date;
21  import java.util.concurrent.locks.Lock;
22  
23  import com.fasterxml.jackson.annotation.JsonFormat;
24  import com.fasterxml.jackson.annotation.JsonIgnore;
25  import com.fasterxml.jackson.annotation.JsonProperty;
26  import com.fasterxml.jackson.databind.annotation.JsonSerialize;
27  import com.hazelcast.nio.ObjectDataInput;
28  import com.hazelcast.nio.ObjectDataOutput;
29  
30  import io.netty.channel.ChannelFutureListener;
31  import io.netty.channel.ChannelHandlerContext;
32  import io.netty.channel.ChannelId;
33  import io.netty.handler.codec.mqtt.MqttMessage;
34  import io.netty.util.concurrent.EventExecutor;
35  import io.netty.util.concurrent.Future;
36  import io.netty.util.concurrent.GenericFutureListener;
37  import io.netty.util.concurrent.GlobalEventExecutor;
38  import net.anyflow.lannister.Literals;
39  import net.anyflow.lannister.cluster.ClusterDataDisposer;
40  import net.anyflow.lannister.cluster.ClusterDataFactory;
41  import net.anyflow.lannister.message.Message;
42  import net.anyflow.lannister.message.OutboundMessageStatus;
43  import net.anyflow.lannister.plugin.DisconnectEventArgs;
44  import net.anyflow.lannister.plugin.DisconnectEventListener;
45  import net.anyflow.lannister.plugin.Plugins;
46  import net.anyflow.lannister.serialization.ChannelIdSerializer;
47  import net.anyflow.lannister.serialization.SerializableFactory;
48  import net.anyflow.lannister.topic.Topic;
49  import net.anyflow.lannister.topic.TopicMatcher;
50  import net.anyflow.lannister.topic.TopicSubscriber;
51  import net.anyflow.lannister.topic.TopicSubscription;
52  
53  public class Session implements com.hazelcast.nio.serialization.IdentifiedDataSerializable {
54  
55  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Session.class);
56  
57  	public static final Sessions NEXUS = new Sessions();
58  	public static final int ID = 4;
59  
60  	@JsonProperty
61  	private String clientId;
62  	@JsonProperty
63  	private String ip;
64  	@JsonProperty
65  	private int port;
66  	@JsonProperty
67  	private boolean isConnected;
68  	@JsonProperty
69  	private int currentMessageId;
70  	@JsonProperty
71  	private Message will;
72  	@JsonProperty
73  	private boolean cleanSession;
74  	@JsonProperty
75  	private int keepAliveSeconds;
76  	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Literals.DATE_DEFAULT_FORMAT, timezone = Literals.DATE_DEFAULT_TIMEZONE)
77  	@JsonProperty
78  	private Date createTime;
79  	@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Literals.DATE_DEFAULT_FORMAT, timezone = Literals.DATE_DEFAULT_TIMEZONE)
80  	@JsonProperty
81  	private Date lastIncomingTime;
82  
83  	private MessageSender messageSender;
84  
85  	private Lock disposeLock;
86  
87  	public Session() { // just for serialization
88  	}
89  
90  	public Session(String clientId, String ip, int port, int keepAliveSeconds, boolean cleanSession, Message will) {
91  		this.clientId = clientId;
92  		this.ip = ip;
93  		this.port = port;
94  		this.isConnected = true;
95  		this.createTime = new Date();
96  		this.currentMessageId = 0;
97  		this.keepAliveSeconds = keepAliveSeconds;
98  		this.lastIncomingTime = new Date();
99  		this.cleanSession = cleanSession;
100 		this.will = will; // [MQTT-3.1.2-9]
101 		this.disposeLock = ClusterDataFactory.INSTANCE.createLock("Session_disposeLock_" + clientId);
102 
103 		this.messageSender = new MessageSender(this);
104 	}
105 
106 	@JsonSerialize(using = ChannelIdSerializer.class)
107 	@JsonProperty
108 	public ChannelId channelId() {
109 		ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
110 		if (ctx == null) { return null; }
111 
112 		return ctx.channel().id();
113 	}
114 
115 	public boolean isConnected(boolean checkOwnership) {
116 		if (!isConnected) { return false; }
117 		if (!checkOwnership) { return isConnected; }
118 
119 		ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
120 		if (ctx == null) { return false; }
121 
122 		return ctx.channel().isActive();
123 	}
124 
125 	public void setConnected(boolean isConnected) {
126 		this.isConnected = isConnected;
127 
128 		Session.NEXUS.persist(this);
129 	}
130 
131 	public String clientId() {
132 		return clientId;
133 	}
134 
135 	public Message will() {
136 		return will;
137 	}
138 
139 	public void will(Message will) {
140 		this.will = will;
141 
142 		Session.NEXUS.persist(this);
143 	}
144 
145 	public boolean cleanSession() {
146 		return cleanSession;
147 	}
148 
149 	public boolean isExpired() {
150 		if (keepAliveSeconds == 0) { return false; }
151 
152 		return (new Date().getTime() - lastIncomingTime.getTime()) > keepAliveSeconds * 1.5 * 1000;
153 	}
154 
155 	public void setLastIncomingTime(Date lastIncomingTime) {
156 		this.lastIncomingTime = lastIncomingTime;
157 
158 		Session.NEXUS.persist(this);
159 	}
160 
161 	public TopicSubscription matches(String topicName) {
162 		return TopicSubscription.NEXUS.topicFiltersOf(clientId).stream()
163 				.filter(topicFilter -> TopicMatcher.match(topicFilter, topicName))
164 				.map(topicFilter -> TopicSubscription.NEXUS.getBy(topicFilter, clientId))
165 				.max((p1, p2) -> p1.qos().compareTo(p2.qos())).orElse(null); // [MQTT-3.3.5-1]
166 	}
167 
168 	public void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) {
169 		messageSender.send(message, completeListener);
170 	}
171 
172 	protected void sendPublish(Topic topic, Message message) {
173 		messageSender.sendPublish(topic, message);
174 	}
175 
176 	public void completeRemainedMessages() {
177 		messageSender.completeRemainedMessages();
178 	}
179 
180 	public int nextMessageId() {
181 		currentMessageId = currentMessageId + 1;
182 
183 		if (currentMessageId > Message.MAX_MESSAGE_ID_NUM) {
184 			currentMessageId = Message.MIN_MESSAGE_ID_NUM;
185 		}
186 
187 		Session.NEXUS.persist(this);
188 
189 		return currentMessageId;
190 	}
191 
192 	public void dispose(boolean sendWill) {
193 		setConnected(false);
194 
195 		if (sendWill && will != null) { // [MQTT-3.1.2-12]
196 			Topic topic = Topic.NEXUS.prepare(will);
197 			topic.publish(will);
198 
199 			will(null); // [MQTT-3.1.2-10]
200 		}
201 
202 		ChannelId channelId = null;
203 		ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
204 		if (ctx != null) {
205 			ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE);
206 			channelId = ctx.channel().id();
207 		}
208 
209 		logger.debug("Session disposed [clientId={}, channelId={}]", clientId, ctx == null ? "null" : channelId);
210 
211 		EventExecutor executor = ctx != null ? ctx.channel().eventLoop() : GlobalEventExecutor.INSTANCE;
212 		executor.execute(
213 				() -> Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new DisconnectEventArgs() {
214 					@Override
215 					public String clientId() {
216 						return clientId;
217 					}
218 
219 					@Override
220 					public Boolean cleanSession() {
221 						return cleanSession;
222 					}
223 
224 					@Override
225 					public Boolean byDisconnectMessage() {
226 						return !sendWill;
227 					}
228 				}));
229 
230 		// TODO WHY => Current thread is not owner of the lock! -> <not-locked>
231 		// disposeLock.lock();
232 		// try {
233 		if (cleanSession) {
234 			TopicSubscriber.NEXUS.removeByClientId(clientId);
235 			TopicSubscription.NEXUS.removeByClientId(clientId);
236 			OutboundMessageStatus.NEXUS.removeByClientId(clientId);
237 		}
238 
239 		NEXUS.remove(this);
240 		// }
241 		// finally {
242 		// disposeLock.unlock();
243 		// }
244 
245 		ClusterDataDisposer.INSTANCE.disposeLock(disposeLock);
246 
247 	}
248 
249 	@JsonIgnore
250 	@Override
251 	public int getFactoryId() {
252 		return SerializableFactory.ID;
253 	}
254 
255 	@JsonIgnore
256 	@Override
257 	public int getId() {
258 		return ID;
259 	}
260 
261 	@Override
262 	public void writeData(ObjectDataOutput out) throws IOException {
263 		out.writeUTF(clientId);
264 		out.writeUTF(ip);
265 		out.writeInt(port);
266 		out.writeBoolean(isConnected);
267 		out.writeInt(currentMessageId);
268 
269 		out.writeBoolean(will != null);
270 		if (will != null) {
271 			will.writeData(out);
272 		}
273 
274 		out.writeBoolean(cleanSession);
275 		out.writeInt(keepAliveSeconds);
276 		out.writeLong(createTime != null ? createTime.getTime() : Long.MIN_VALUE);
277 		out.writeLong(lastIncomingTime != null ? lastIncomingTime.getTime() : Long.MIN_VALUE);
278 	}
279 
280 	@Override
281 	public void readData(ObjectDataInput in) throws IOException {
282 		clientId = in.readUTF();
283 		ip = in.readUTF();
284 		port = in.readInt();
285 		isConnected = in.readBoolean();
286 		currentMessageId = in.readInt();
287 
288 		if (in.readBoolean()) {
289 			will = new Message(in);
290 		}
291 
292 		cleanSession = in.readBoolean();
293 		keepAliveSeconds = in.readInt();
294 
295 		long rawLong = in.readLong();
296 		createTime = rawLong != Long.MIN_VALUE ? new Date(rawLong) : null;
297 
298 		rawLong = in.readLong();
299 		lastIncomingTime = rawLong != Long.MIN_VALUE ? new Date(rawLong) : null;
300 
301 		disposeLock = ClusterDataFactory.INSTANCE.createLock("Session_disposeLock_" + clientId);
302 		messageSender = new MessageSender(this);
303 	}
304 }