1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.session;
18
19 import java.io.IOException;
20 import java.util.Date;
21 import java.util.concurrent.locks.Lock;
22
23 import com.fasterxml.jackson.annotation.JsonFormat;
24 import com.fasterxml.jackson.annotation.JsonIgnore;
25 import com.fasterxml.jackson.annotation.JsonProperty;
26 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
27 import com.hazelcast.nio.ObjectDataInput;
28 import com.hazelcast.nio.ObjectDataOutput;
29
30 import io.netty.channel.ChannelFutureListener;
31 import io.netty.channel.ChannelHandlerContext;
32 import io.netty.channel.ChannelId;
33 import io.netty.handler.codec.mqtt.MqttMessage;
34 import io.netty.util.concurrent.EventExecutor;
35 import io.netty.util.concurrent.Future;
36 import io.netty.util.concurrent.GenericFutureListener;
37 import io.netty.util.concurrent.GlobalEventExecutor;
38 import net.anyflow.lannister.Literals;
39 import net.anyflow.lannister.cluster.ClusterDataDisposer;
40 import net.anyflow.lannister.cluster.ClusterDataFactory;
41 import net.anyflow.lannister.message.Message;
42 import net.anyflow.lannister.message.OutboundMessageStatus;
43 import net.anyflow.lannister.plugin.DisconnectEventArgs;
44 import net.anyflow.lannister.plugin.DisconnectEventListener;
45 import net.anyflow.lannister.plugin.Plugins;
46 import net.anyflow.lannister.serialization.ChannelIdSerializer;
47 import net.anyflow.lannister.serialization.SerializableFactory;
48 import net.anyflow.lannister.topic.Topic;
49 import net.anyflow.lannister.topic.TopicMatcher;
50 import net.anyflow.lannister.topic.TopicSubscriber;
51 import net.anyflow.lannister.topic.TopicSubscription;
52
53 public class Session implements com.hazelcast.nio.serialization.IdentifiedDataSerializable {
54
55 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Session.class);
56
57 public static final Sessions NEXUS = new Sessions();
58 public static final int ID = 4;
59
60 @JsonProperty
61 private String clientId;
62 @JsonProperty
63 private String ip;
64 @JsonProperty
65 private int port;
66 @JsonProperty
67 private boolean isConnected;
68 @JsonProperty
69 private int currentMessageId;
70 @JsonProperty
71 private Message will;
72 @JsonProperty
73 private boolean cleanSession;
74 @JsonProperty
75 private int keepAliveSeconds;
76 @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Literals.DATE_DEFAULT_FORMAT, timezone = Literals.DATE_DEFAULT_TIMEZONE)
77 @JsonProperty
78 private Date createTime;
79 @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Literals.DATE_DEFAULT_FORMAT, timezone = Literals.DATE_DEFAULT_TIMEZONE)
80 @JsonProperty
81 private Date lastIncomingTime;
82
83 private MessageSender messageSender;
84
85 private Lock disposeLock;
86
87 public Session() {
88 }
89
90 public Session(String clientId, String ip, int port, int keepAliveSeconds, boolean cleanSession, Message will) {
91 this.clientId = clientId;
92 this.ip = ip;
93 this.port = port;
94 this.isConnected = true;
95 this.createTime = new Date();
96 this.currentMessageId = 0;
97 this.keepAliveSeconds = keepAliveSeconds;
98 this.lastIncomingTime = new Date();
99 this.cleanSession = cleanSession;
100 this.will = will;
101 this.disposeLock = ClusterDataFactory.INSTANCE.createLock("Session_disposeLock_" + clientId);
102
103 this.messageSender = new MessageSender(this);
104 }
105
106 @JsonSerialize(using = ChannelIdSerializer.class)
107 @JsonProperty
108 public ChannelId channelId() {
109 ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
110 if (ctx == null) { return null; }
111
112 return ctx.channel().id();
113 }
114
115 public boolean isConnected(boolean checkOwnership) {
116 if (!isConnected) { return false; }
117 if (!checkOwnership) { return isConnected; }
118
119 ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
120 if (ctx == null) { return false; }
121
122 return ctx.channel().isActive();
123 }
124
125 public void setConnected(boolean isConnected) {
126 this.isConnected = isConnected;
127
128 Session.NEXUS.persist(this);
129 }
130
131 public String clientId() {
132 return clientId;
133 }
134
135 public Message will() {
136 return will;
137 }
138
139 public void will(Message will) {
140 this.will = will;
141
142 Session.NEXUS.persist(this);
143 }
144
145 public boolean cleanSession() {
146 return cleanSession;
147 }
148
149 public boolean isExpired() {
150 if (keepAliveSeconds == 0) { return false; }
151
152 return (new Date().getTime() - lastIncomingTime.getTime()) > keepAliveSeconds * 1.5 * 1000;
153 }
154
155 public void setLastIncomingTime(Date lastIncomingTime) {
156 this.lastIncomingTime = lastIncomingTime;
157
158 Session.NEXUS.persist(this);
159 }
160
161 public TopicSubscription matches(String topicName) {
162 return TopicSubscription.NEXUS.topicFiltersOf(clientId).stream()
163 .filter(topicFilter -> TopicMatcher.match(topicFilter, topicName))
164 .map(topicFilter -> TopicSubscription.NEXUS.getBy(topicFilter, clientId))
165 .max((p1, p2) -> p1.qos().compareTo(p2.qos())).orElse(null);
166 }
167
168 public void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) {
169 messageSender.send(message, completeListener);
170 }
171
172 protected void sendPublish(Topic topic, Message message) {
173 messageSender.sendPublish(topic, message);
174 }
175
176 public void completeRemainedMessages() {
177 messageSender.completeRemainedMessages();
178 }
179
180 public int nextMessageId() {
181 currentMessageId = currentMessageId + 1;
182
183 if (currentMessageId > Message.MAX_MESSAGE_ID_NUM) {
184 currentMessageId = Message.MIN_MESSAGE_ID_NUM;
185 }
186
187 Session.NEXUS.persist(this);
188
189 return currentMessageId;
190 }
191
192 public void dispose(boolean sendWill) {
193 setConnected(false);
194
195 if (sendWill && will != null) {
196 Topic topic = Topic.NEXUS.prepare(will);
197 topic.publish(will);
198
199 will(null);
200 }
201
202 ChannelId channelId = null;
203 ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
204 if (ctx != null) {
205 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE);
206 channelId = ctx.channel().id();
207 }
208
209 logger.debug("Session disposed [clientId={}, channelId={}]", clientId, ctx == null ? "null" : channelId);
210
211 EventExecutor executor = ctx != null ? ctx.channel().eventLoop() : GlobalEventExecutor.INSTANCE;
212 executor.execute(
213 () -> Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new DisconnectEventArgs() {
214 @Override
215 public String clientId() {
216 return clientId;
217 }
218
219 @Override
220 public Boolean cleanSession() {
221 return cleanSession;
222 }
223
224 @Override
225 public Boolean byDisconnectMessage() {
226 return !sendWill;
227 }
228 }));
229
230
231
232
233 if (cleanSession) {
234 TopicSubscriber.NEXUS.removeByClientId(clientId);
235 TopicSubscription.NEXUS.removeByClientId(clientId);
236 OutboundMessageStatus.NEXUS.removeByClientId(clientId);
237 }
238
239 NEXUS.remove(this);
240
241
242
243
244
245 ClusterDataDisposer.INSTANCE.disposeLock(disposeLock);
246
247 }
248
249 @JsonIgnore
250 @Override
251 public int getFactoryId() {
252 return SerializableFactory.ID;
253 }
254
255 @JsonIgnore
256 @Override
257 public int getId() {
258 return ID;
259 }
260
261 @Override
262 public void writeData(ObjectDataOutput out) throws IOException {
263 out.writeUTF(clientId);
264 out.writeUTF(ip);
265 out.writeInt(port);
266 out.writeBoolean(isConnected);
267 out.writeInt(currentMessageId);
268
269 out.writeBoolean(will != null);
270 if (will != null) {
271 will.writeData(out);
272 }
273
274 out.writeBoolean(cleanSession);
275 out.writeInt(keepAliveSeconds);
276 out.writeLong(createTime != null ? createTime.getTime() : Long.MIN_VALUE);
277 out.writeLong(lastIncomingTime != null ? lastIncomingTime.getTime() : Long.MIN_VALUE);
278 }
279
280 @Override
281 public void readData(ObjectDataInput in) throws IOException {
282 clientId = in.readUTF();
283 ip = in.readUTF();
284 port = in.readInt();
285 isConnected = in.readBoolean();
286 currentMessageId = in.readInt();
287
288 if (in.readBoolean()) {
289 will = new Message(in);
290 }
291
292 cleanSession = in.readBoolean();
293 keepAliveSeconds = in.readInt();
294
295 long rawLong = in.readLong();
296 createTime = rawLong != Long.MIN_VALUE ? new Date(rawLong) : null;
297
298 rawLong = in.readLong();
299 lastIncomingTime = rawLong != Long.MIN_VALUE ? new Date(rawLong) : null;
300
301 disposeLock = ClusterDataFactory.INSTANCE.createLock("Session_disposeLock_" + clientId);
302 messageSender = new MessageSender(this);
303 }
304 }