MessageSender.java
/*
* Copyright 2016 The Lannister Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.anyflow.lannister.session;
import java.util.Date;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import net.anyflow.lannister.Settings;
import net.anyflow.lannister.Statistics;
import net.anyflow.lannister.message.InboundMessageStatus;
import net.anyflow.lannister.message.Message;
import net.anyflow.lannister.message.OutboundMessageStatus;
import net.anyflow.lannister.packetreceiver.MqttMessageFactory;
import net.anyflow.lannister.topic.Topic;
public class MessageSender {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MessageSender.class);
private final static int RESPONSE_TIMEOUT_SECONDS = Settings.INSTANCE.getInt("mqttserver.responseTimeoutSeconds",
60);
private final Session session;
protected MessageSender(Session session) {
this.session = session;
}
protected void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) {
if (!session.isConnected(true)) {
logger.error("Message is not sent - Channel is inactive or out of the node. [{}]", message);
return;
}
ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
String log = message.toString();
ChannelFuture cf = ctx.writeAndFlush(message).addListener(f -> {
if (f.isSuccess()) {
logger.debug("packet outgoing [{}]", log);
}
else {
logger.error("packet outgoing failed [{}] {}", log, f.cause());
}
});
if (completeListener != null) {
cf.addListener(completeListener);
}
}
protected void sendPublish(Topic topic, Message message) {
logger.debug("event arrived [clientId={}, message={}]", session.clientId(), message);
if (!session.isConnected(true)) { return; }
send(MqttMessageFactory.publish(message, false), f -> { // [MQTT-3.3.1-2]
if (!f.isSuccess()) { return; }
Statistics.INSTANCE.add(Statistics.Criterion.MESSAGES_PUBLISH_SENT, 1);
switch (message.qos()) {
case AT_MOST_ONCE:
break;
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
OutboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
OutboundMessageStatus.Status.PUBLISHED);
break;
default:
logger.error("Invalid QoS [QoS={}, clientId={}, topic={}]", message.qos(), session.clientId(),
message.topicName());
break;
}
});
}
protected void completeRemainedMessages() {
// TODO should be executed in the middle of 'connected' state?
completeOutboundMessageStatuses();
completeInboundMessageStatuses();
}
private void completeInboundMessageStatuses() {
ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
if (ctx == null) { return; }
ctx.executor().submit(() -> {
Date now = new Date();
InboundMessageStatus.NEXUS.keySet().stream().map(key -> InboundMessageStatus.NEXUS.get(key))
.forEach(messageStatus -> {
long intervalSeconds = (now.getTime() - messageStatus.updateTime().getTime()) * 1000;
if (intervalSeconds < RESPONSE_TIMEOUT_SECONDS) { return; }
Message message = Message.NEXUS.get(messageStatus.key());
MqttMessage toSend;
String log;
switch (messageStatus.status()) {
case RECEIVED:
case PUBRECED:
if (message.qos() == MqttQoS.AT_LEAST_ONCE) {
toSend = MqttMessageFactory.puback(message.id());
log = toSend.toString();
send(toSend, f -> { // [MQTT-2.3.1-6]
if (!f.isSuccess()) {
logger.error("packet outgoing failed [{}] {}", log, f.cause());
return;
}
InboundMessageStatus.NEXUS.removeByKey(message.id(), session.clientId());
logger.debug("Inbound message status REMOVED [clientId={}, messageId={}]",
session.clientId(), message.id());
});
}
else {
toSend = MqttMessageFactory.pubrec(message.id());
log = toSend.toString();
send(toSend, f -> {
if (!f.isSuccess()) {
logger.error("packet outgoing failed [{}] {}", log, f.cause());
return;
}
InboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
InboundMessageStatus.Status.PUBRECED);
}); // [MQTT-2.3.1-6]
}
break;
default:
logger.error(
"Invalid Inbound Message Status [status={}, clientId={}, topic={}, messageId={}]",
messageStatus.status(), session.clientId(), message.topicName(), message.id());
break;
}
});
});
}
private void completeOutboundMessageStatuses() {
ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
if (ctx == null) { return; }
ctx.executor().submit(() -> {
Date now = new Date();
OutboundMessageStatus.NEXUS.messageIdsOf(session.clientId()).stream()
.map(messageId -> OutboundMessageStatus.NEXUS.getBy(messageId, session.clientId()))
.forEach(messageStatus -> {
long intervalSeconds = (now.getTime() - messageStatus.updateTime().getTime()) * 1000;
if (intervalSeconds < RESPONSE_TIMEOUT_SECONDS) { return; }
Message message = Message.NEXUS.get(messageStatus.messageKey());
MqttMessage toSend;
String log;
message.qos(messageStatus.qos());
message.id(messageStatus.messageId()); // [MQTT-2.3.1-4]
message.setRetain(false); // [MQTT-3.3.1-9]
switch (messageStatus.status()) {
case TO_PUBLISH:
case PUBLISHED:
toSend = MqttMessageFactory.publish(message,
messageStatus.status() == OutboundMessageStatus.Status.PUBLISHED);
log = toSend.toString();
send(toSend, f -> { // [MQTT-3.3.1-1]
if (!f.isSuccess()) {
logger.error("packet outgoing failed [{}] {}", log, f.cause());
return;
}
OutboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
OutboundMessageStatus.Status.PUBLISHED);
Statistics.INSTANCE.add(Statistics.Criterion.MESSAGES_PUBLISH_SENT, 1);
});
break;
case PUBRECED:
send(MqttMessageFactory.pubrel(message.id()), null);
break;
default:
logger.error(
"Invalid Outbound Message Status [status={}, clientId={}, topic={}, messageId={}]",
messageStatus.status(), session.clientId(), message.topicName(), message.id());
break;
}
});
});
}
}