MqttMessageFactory.java
/*
* Copyright 2016 The Lannister Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.anyflow.lannister.packetreceiver;
import java.util.List;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.util.CharsetUtil;
import net.anyflow.lannister.message.ConnectOptions;
import net.anyflow.lannister.plugin.IMessage;
public class MqttMessageFactory {
public static MqttConnectMessage connect(ConnectOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
10);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
options.version().protocolLevel(), options.userName() != null, options.password() != null,
options.will() == null ? false : options.will().isRetain(),
options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
options.cleanSession(), options.keepAliveTimeSeconds());
MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
options.will() == null ? "" : options.will().topicName(),
options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));
return new MqttConnectMessage(fixedHeader, variableHeader, payload);
}
public static MqttConnAckMessage connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}
public static MqttPublishMessage publish(IMessage message, boolean isDup) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, message.qos(),
message.isRetain(), 7 + message.message().length);
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(message.topicName(), message.id());
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(message.message().length);
return new MqttPublishMessage(fixedHeader, variableHeader, buf.writeBytes(message.message()));
}
public static MqttPubAckMessage puback(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttPubAckMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubrec(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubrel(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttMessage pubcomp(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttMessage(fixedHeader, variableHeader);
}
public static MqttMessage pingresp() {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false,
0);
return new MqttMessage(fixedHeader);
}
public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
int topicNameSize = 0;
int topicCount = topicSubscriptions.length;
for (MqttTopicSubscription item : topicSubscriptions) {
topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
false, 2 + topicNameSize + topicCount);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));
return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
}
public static MqttSubAckMessage suback(int messageId, List<Integer> grantedQoSLevels) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2 + grantedQoSLevels.size());
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
}
public static MqttUnsubAckMessage unsuback(int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false,
2);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
return new MqttUnsubAckMessage(fixedHeader, variableHeader);
}
public static MqttMessage disconnect() {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE,
false, 2);
return new MqttMessage(fixedHeader);
}
}