View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.packetreceiver;
18  
19  import java.util.List;
20  
21  import com.google.common.base.Strings;
22  import com.google.common.collect.Lists;
23  
24  import io.netty.buffer.ByteBuf;
25  import io.netty.buffer.PooledByteBufAllocator;
26  import io.netty.handler.codec.mqtt.MqttConnAckMessage;
27  import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
28  import io.netty.handler.codec.mqtt.MqttConnectMessage;
29  import io.netty.handler.codec.mqtt.MqttConnectPayload;
30  import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
31  import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
32  import io.netty.handler.codec.mqtt.MqttFixedHeader;
33  import io.netty.handler.codec.mqtt.MqttMessage;
34  import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
35  import io.netty.handler.codec.mqtt.MqttMessageType;
36  import io.netty.handler.codec.mqtt.MqttPubAckMessage;
37  import io.netty.handler.codec.mqtt.MqttPublishMessage;
38  import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
39  import io.netty.handler.codec.mqtt.MqttQoS;
40  import io.netty.handler.codec.mqtt.MqttSubAckMessage;
41  import io.netty.handler.codec.mqtt.MqttSubAckPayload;
42  import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
43  import io.netty.handler.codec.mqtt.MqttSubscribePayload;
44  import io.netty.handler.codec.mqtt.MqttTopicSubscription;
45  import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
46  import io.netty.util.CharsetUtil;
47  import net.anyflow.lannister.message.ConnectOptions;
48  import net.anyflow.lannister.plugin.IMessage;
49  
50  public class MqttMessageFactory {
51  	public static MqttConnectMessage connect(ConnectOptions options) {
52  		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false,
53  				10);
54  		MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(),
55  				options.version().protocolLevel(), options.userName() != null, options.password() != null,
56  				options.will() == null ? false : options.will().isRetain(),
57  				options.will() == null ? 0 : options.will().qos().value(), options.will() != null,
58  				options.cleanSession(), options.keepAliveTimeSeconds());
59  
60  		MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()),
61  				options.will() == null ? "" : options.will().topicName(),
62  				options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8),
63  				Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password()));
64  
65  		return new MqttConnectMessage(fixedHeader, variableHeader, payload);
66  	}
67  
68  	public static MqttConnAckMessage connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {
69  		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false,
70  				2);
71  		MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
72  
73  		return new MqttConnAckMessage(fixedHeader, variableHeader);
74  	}
75  
76  	public static MqttPublishMessage publish(IMessage message, boolean isDup) {
77  		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, message.qos(),
78  				message.isRetain(), 7 + message.message().length);
79  
80  		MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(message.topicName(), message.id());
81  
82  		ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(message.message().length);
83  
84  		return new MqttPublishMessage(fixedHeader, variableHeader, buf.writeBytes(message.message()));
85  	}
86  
87  	public static MqttPubAckMessage puback(int messageId) {
88  		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false,
89  				2);
90  		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
91  
92  		return new MqttPubAckMessage(fixedHeader, variableHeader);
93  	}
94  
95  	public static MqttMessage pubrec(int messageId) {
96  		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false,
97  				2);
98  		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
99  
100 		return new MqttMessage(fixedHeader, variableHeader);
101 	}
102 
103 	public static MqttMessage pubrel(int messageId) {
104 		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false,
105 				2);
106 		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
107 
108 		return new MqttMessage(fixedHeader, variableHeader);
109 	}
110 
111 	public static MqttMessage pubcomp(int messageId) {
112 		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false,
113 				2);
114 		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
115 
116 		return new MqttMessage(fixedHeader, variableHeader);
117 	}
118 
119 	public static MqttMessage pingresp() {
120 		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false,
121 				0);
122 
123 		return new MqttMessage(fixedHeader);
124 	}
125 
126 	public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) {
127 		int topicNameSize = 0;
128 		int topicCount = topicSubscriptions.length;
129 
130 		for (MqttTopicSubscription item : topicSubscriptions) {
131 			topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length;
132 		}
133 
134 		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE,
135 				false, 2 + topicNameSize + topicCount);
136 		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
137 		MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions));
138 
139 		return new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
140 	}
141 
142 	public static MqttSubAckMessage suback(int messageId, List<Integer> grantedQoSLevels) {
143 		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false,
144 				2 + grantedQoSLevels.size());
145 		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
146 		MqttSubAckPayload payload = new MqttSubAckPayload(grantedQoSLevels);
147 
148 		return new MqttSubAckMessage(fixedHeader, variableHeader, payload);
149 	}
150 
151 	public static MqttUnsubAckMessage unsuback(int messageId) {
152 		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false,
153 				2);
154 		MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
155 
156 		return new MqttUnsubAckMessage(fixedHeader, variableHeader);
157 	}
158 
159 	public static MqttMessage disconnect() {
160 		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE,
161 				false, 2);
162 
163 		return new MqttMessage(fixedHeader);
164 	}
165 }