1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import java.util.List;
20
21 import com.google.common.base.Strings;
22 import com.google.common.collect.Lists;
23
24 import io.netty.buffer.ByteBuf;
25 import io.netty.buffer.PooledByteBufAllocator;
26 import io.netty.handler.codec.mqtt.MqttConnAckMessage;
27 import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
28 import io.netty.handler.codec.mqtt.MqttConnectMessage;
29 import io.netty.handler.codec.mqtt.MqttConnectPayload;
30 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
31 import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
32 import io.netty.handler.codec.mqtt.MqttFixedHeader;
33 import io.netty.handler.codec.mqtt.MqttMessage;
34 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
35 import io.netty.handler.codec.mqtt.MqttMessageType;
36 import io.netty.handler.codec.mqtt.MqttPubAckMessage;
37 import io.netty.handler.codec.mqtt.MqttPublishMessage;
38 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
39 import io.netty.handler.codec.mqtt.MqttQoS;
40 import io.netty.handler.codec.mqtt.MqttSubAckMessage;
41 import io.netty.handler.codec.mqtt.MqttSubAckPayload;
42 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
43 import io.netty.handler.codec.mqtt.MqttSubscribePayload;
44 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
45 import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
46 import io.netty.util.CharsetUtil;
47 import net.anyflow.lannister.message.ConnectOptions;
48 import net.anyflow.lannister.plugin.IMessage;
49
50 public class MqttMessageFactory {
51 public static MqttConnectMessage connect(ConnectOptions options) {
52 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
53 10);
54 MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
55 options.version().protocolLevel(), options.userName() != null, options.password() != null,
56 options.will() == null ? false : options.will().isRetain(),
57 options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
58 options.cleanSession(), options.keepAliveTimeSeconds());
59
60 MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
61 options.will() == null ? "" : options.will().topicName(),
62 options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
63 Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));
64
65 return new MqttConnectMessage(fixedHeader, variableHeader, payload);
66 }
67
68 public static MqttConnAckMessage connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {
69 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false,
70 2);
71 MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
72
73 return new MqttConnAckMessage(fixedHeader, variableHeader);
74 }
75
76 public static MqttPublishMessage publish(IMessage message, boolean isDup) {
77 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, message.qos(),
78 message.isRetain(), 7 + message.message().length);
79
80 MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(message.topicName(), message.id());
81
82 ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(message.message().length);
83
84 return new MqttPublishMessage(fixedHeader, variableHeader, buf.writeBytes(message.message()));
85 }
86
87 public static MqttPubAckMessage puback(int messageId) {
88 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false,
89 2);
90 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
91
92 return new MqttPubAckMessage(fixedHeader, variableHeader);
93 }
94
95 public static MqttMessage pubrec(int messageId) {
96 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false,
97 2);
98 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
99
100 return new MqttMessage(fixedHeader, variableHeader);
101 }
102
103 public static MqttMessage pubrel(int messageId) {
104 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false,
105 2);
106 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
107
108 return new MqttMessage(fixedHeader, variableHeader);
109 }
110
111 public static MqttMessage pubcomp(int messageId) {
112 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false,
113 2);
114 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
115
116 return new MqttMessage(fixedHeader, variableHeader);
117 }
118
119 public static MqttMessage pingresp() {
120 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false,
121 0);
122
123 return new MqttMessage(fixedHeader);
124 }
125
126 public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
127 int topicNameSize = 0;
128 int topicCount = topicSubscriptions.length;
129
130 for (MqttTopicSubscription item : topicSubscriptions) {
131 topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
132 }
133
134 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
135 false, 2 + topicNameSize + topicCount);
136 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
137 MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));
138
139 return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
140 }
141
142 public static MqttSubAckMessage suback(int messageId, List<Integer> grantedQoSLevels) {
143 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false,
144 2 + grantedQoSLevels.size());
145 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
146 MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
147
148 return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
149 }
150
151 public static MqttUnsubAckMessage unsuback(int messageId) {
152 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false,
153 2);
154 MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
155
156 return new MqttUnsubAckMessage(fixedHeader, variableHeader);
157 }
158
159 public static MqttMessage disconnect() {
160 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE,
161 false, 2);
162
163 return new MqttMessage(fixedHeader);
164 }
165 }