Session.java
/*
* Copyright 2016 The Lannister Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.anyflow.lannister.session;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import net.anyflow.lannister.Literals;
import net.anyflow.lannister.cluster.ClusterDataDisposer;
import net.anyflow.lannister.cluster.ClusterDataFactory;
import net.anyflow.lannister.message.Message;
import net.anyflow.lannister.message.OutboundMessageStatus;
import net.anyflow.lannister.plugin.DisconnectEventArgs;
import net.anyflow.lannister.plugin.DisconnectEventListener;
import net.anyflow.lannister.plugin.Plugins;
import net.anyflow.lannister.serialization.ChannelIdSerializer;
import net.anyflow.lannister.serialization.SerializableFactory;
import net.anyflow.lannister.topic.Topic;
import net.anyflow.lannister.topic.TopicMatcher;
import net.anyflow.lannister.topic.TopicSubscriber;
import net.anyflow.lannister.topic.TopicSubscription;
public class Session implements com.hazelcast.nio.serialization.IdentifiedDataSerializable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Session.class);
public static final Sessions NEXUS = new Sessions();
public static final int ID = 4;
@JsonProperty
private String clientId;
@JsonProperty
private String ip;
@JsonProperty
private int port;
@JsonProperty
private boolean isConnected;
@JsonProperty
private int currentMessageId;
@JsonProperty
private Message will;
@JsonProperty
private boolean cleanSession;
@JsonProperty
private int keepAliveSeconds;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Literals.DATE_DEFAULT_FORMAT, timezone = Literals.DATE_DEFAULT_TIMEZONE)
@JsonProperty
private Date createTime;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = Literals.DATE_DEFAULT_FORMAT, timezone = Literals.DATE_DEFAULT_TIMEZONE)
@JsonProperty
private Date lastIncomingTime;
private MessageSender messageSender;
private Lock disposeLock;
public Session() { // just for serialization
}
public Session(String clientId, String ip, int port, int keepAliveSeconds, boolean cleanSession, Message will) {
this.clientId = clientId;
this.ip = ip;
this.port = port;
this.isConnected = true;
this.createTime = new Date();
this.currentMessageId = 0;
this.keepAliveSeconds = keepAliveSeconds;
this.lastIncomingTime = new Date();
this.cleanSession = cleanSession;
this.will = will; // [MQTT-3.1.2-9]
this.disposeLock = ClusterDataFactory.INSTANCE.createLock("Session_disposeLock_" + clientId);
this.messageSender = new MessageSender(this);
}
@JsonSerialize(using = ChannelIdSerializer.class)
@JsonProperty
public ChannelId channelId() {
ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
if (ctx == null) { return null; }
return ctx.channel().id();
}
public boolean isConnected(boolean checkOwnership) {
if (!isConnected) { return false; }
if (!checkOwnership) { return isConnected; }
ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
if (ctx == null) { return false; }
return ctx.channel().isActive();
}
public void setConnected(boolean isConnected) {
this.isConnected = isConnected;
Session.NEXUS.persist(this);
}
public String clientId() {
return clientId;
}
public Message will() {
return will;
}
public void will(Message will) {
this.will = will;
Session.NEXUS.persist(this);
}
public boolean cleanSession() {
return cleanSession;
}
public boolean isExpired() {
if (keepAliveSeconds == 0) { return false; }
return (new Date().getTime() - lastIncomingTime.getTime()) > keepAliveSeconds * 1.5 * 1000;
}
public void setLastIncomingTime(Date lastIncomingTime) {
this.lastIncomingTime = lastIncomingTime;
Session.NEXUS.persist(this);
}
public TopicSubscription matches(String topicName) {
return TopicSubscription.NEXUS.topicFiltersOf(clientId).stream()
.filter(topicFilter -> TopicMatcher.match(topicFilter, topicName))
.map(topicFilter -> TopicSubscription.NEXUS.getBy(topicFilter, clientId))
.max((p1, p2) -> p1.qos().compareTo(p2.qos())).orElse(null); // [MQTT-3.3.5-1]
}
public void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) {
messageSender.send(message, completeListener);
}
protected void sendPublish(Topic topic, Message message) {
messageSender.sendPublish(topic, message);
}
public void completeRemainedMessages() {
messageSender.completeRemainedMessages();
}
public int nextMessageId() {
currentMessageId = currentMessageId + 1;
if (currentMessageId > Message.MAX_MESSAGE_ID_NUM) {
currentMessageId = Message.MIN_MESSAGE_ID_NUM;
}
Session.NEXUS.persist(this);
return currentMessageId;
}
public void dispose(boolean sendWill) {
setConnected(false);
if (sendWill && will != null) { // [MQTT-3.1.2-12]
Topic topic = Topic.NEXUS.prepare(will);
topic.publish(will);
will(null); // [MQTT-3.1.2-10]
}
ChannelId channelId = null;
ChannelHandlerContext ctx = NEXUS.channelHandlerContext(clientId);
if (ctx != null) {
ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE);
channelId = ctx.channel().id();
}
logger.debug("Session disposed [clientId={}, channelId={}]", clientId, ctx == null ? "null" : channelId);
EventExecutor executor = ctx != null ? ctx.channel().eventLoop() : GlobalEventExecutor.INSTANCE;
executor.execute(
() -> Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new DisconnectEventArgs() {
@Override
public String clientId() {
return clientId;
}
@Override
public Boolean cleanSession() {
return cleanSession;
}
@Override
public Boolean byDisconnectMessage() {
return !sendWill;
}
}));
// TODO WHY => Current thread is not owner of the lock! -> <not-locked>
// disposeLock.lock();
// try {
if (cleanSession) {
TopicSubscriber.NEXUS.removeByClientId(clientId);
TopicSubscription.NEXUS.removeByClientId(clientId);
OutboundMessageStatus.NEXUS.removeByClientId(clientId);
}
NEXUS.remove(this);
// }
// finally {
// disposeLock.unlock();
// }
ClusterDataDisposer.INSTANCE.disposeLock(disposeLock);
}
@JsonIgnore
@Override
public int getFactoryId() {
return SerializableFactory.ID;
}
@JsonIgnore
@Override
public int getId() {
return ID;
}
@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(clientId);
out.writeUTF(ip);
out.writeInt(port);
out.writeBoolean(isConnected);
out.writeInt(currentMessageId);
out.writeBoolean(will != null);
if (will != null) {
will.writeData(out);
}
out.writeBoolean(cleanSession);
out.writeInt(keepAliveSeconds);
out.writeLong(createTime != null ? createTime.getTime() : Long.MIN_VALUE);
out.writeLong(lastIncomingTime != null ? lastIncomingTime.getTime() : Long.MIN_VALUE);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
clientId = in.readUTF();
ip = in.readUTF();
port = in.readInt();
isConnected = in.readBoolean();
currentMessageId = in.readInt();
if (in.readBoolean()) {
will = new Message(in);
}
cleanSession = in.readBoolean();
keepAliveSeconds = in.readInt();
long rawLong = in.readLong();
createTime = rawLong != Long.MIN_VALUE ? new Date(rawLong) : null;
rawLong = in.readLong();
lastIncomingTime = rawLong != Long.MIN_VALUE ? new Date(rawLong) : null;
disposeLock = ClusterDataFactory.INSTANCE.createLock("Session_disposeLock_" + clientId);
messageSender = new MessageSender(this);
}
}