View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.message;
18  
19  import java.io.IOException;
20  
21  import com.fasterxml.jackson.annotation.JsonIgnore;
22  import com.fasterxml.jackson.annotation.JsonProperty;
23  import com.hazelcast.nio.ObjectDataInput;
24  import com.hazelcast.nio.ObjectDataOutput;
25  
26  import io.netty.handler.codec.mqtt.MqttPublishMessage;
27  import io.netty.handler.codec.mqtt.MqttQoS;
28  import io.netty.util.CharsetUtil;
29  import io.netty.util.internal.StringUtil;
30  import net.anyflow.lannister.NettyUtil;
31  import net.anyflow.lannister.plugin.IMessage;
32  import net.anyflow.lannister.serialization.SerializableFactory;
33  
34  public class Message implements com.hazelcast.nio.serialization.IdentifiedDataSerializable, IMessage, Cloneable {
35  	public final static Messages NEXUS = new Messages();
36  	public final static int ID = 1;
37  
38  	public static final int MAX_MESSAGE_ID_NUM = 0xffff;
39  	public static final int MIN_MESSAGE_ID_NUM = 1;
40  
41  	@JsonProperty
42  	private int id;
43  	@JsonProperty
44  	private String topicName;
45  	@JsonProperty
46  	private String publisherId;
47  	@JsonProperty
48  	private byte[] message;
49  	@JsonProperty
50  	private MqttQoS qos;
51  	@JsonProperty
52  	private boolean isRetain;
53  
54  	public Message() {
55  	}
56  
57  	public Message(ObjectDataInput in) throws IOException {
58  		readData(in);
59  	}
60  
61  	public Message(int id, String topicName, String publisherId, byte[] message, MqttQoS qos, boolean isRetain) {
62  		this.id = id;
63  		this.topicName = topicName;
64  		this.publisherId = publisherId;
65  		this.message = message != null ? message : new byte[] {};
66  		this.qos = qos;
67  		this.isRetain = isRetain;
68  	}
69  
70  	public String key() {
71  		return Messages.key(publisherId, id);
72  	}
73  
74  	/*
75  	 * (non-Javadoc)
76  	 * 
77  	 * @see net.anyflow.lannister.message.IMessage#id()
78  	 */
79  	@Override
80  	public int id() {
81  		return id;
82  	}
83  
84  	public void id(int id) {
85  		this.id = id;
86  	}
87  
88  	/*
89  	 * (non-Javadoc)
90  	 * 
91  	 * @see net.anyflow.lannister.message.IMessage#topicName()
92  	 */
93  	@Override
94  	public String topicName() {
95  		return topicName;
96  	}
97  
98  	@Override
99  	public String publisherId() {
100 		return publisherId;
101 	}
102 
103 	public void publisherId(String publisherId) {
104 		this.publisherId = publisherId;
105 	}
106 
107 	/*
108 	 * (non-Javadoc)
109 	 * 
110 	 * @see net.anyflow.lannister.message.IMessage#message()
111 	 */
112 	@Override
113 	public byte[] message() {
114 		return message;
115 	}
116 
117 	public void setMessage(byte[] message) {
118 		this.message = message != null ? message : new byte[] {};
119 	}
120 
121 	/*
122 	 * (non-Javadoc)
123 	 * 
124 	 * @see net.anyflow.lannister.message.IMessage#qos()
125 	 */
126 	@Override
127 	public MqttQoS qos() {
128 		return qos;
129 	}
130 
131 	public void qos(MqttQoS qos) {
132 		this.qos = qos;
133 	}
134 
135 	/*
136 	 * (non-Javadoc)
137 	 * 
138 	 * @see net.anyflow.lannister.message.IMessage#isRetain()
139 	 */
140 	@Override
141 	public boolean isRetain() {
142 		return isRetain;
143 	}
144 
145 	public void setRetain(boolean isRetain) {
146 		this.isRetain = isRetain;
147 	}
148 
149 	@Override
150 	public String toString() {
151 		return new StringBuilder(StringUtil.simpleClassName(this)).append('[').append("id=").append(id)
152 				.append(", topeName=").append(topicName).append(", message=")
153 				.append(new String(message, CharsetUtil.UTF_8)).append(", QoS=").append(qos).append(", retain=")
154 				.append(isRetain).append(']').toString();
155 	}
156 
157 	@Override
158 	public Message clone() {
159 		return new Message(id, topicName, publisherId, message, qos, isRetain);
160 	}
161 
162 	@JsonIgnore
163 	@Override
164 	public int getFactoryId() {
165 		return SerializableFactory.ID;
166 	}
167 
168 	@JsonIgnore
169 	@Override
170 	public int getId() {
171 		return ID;
172 	}
173 
174 	@Override
175 	public void writeData(ObjectDataOutput out) throws IOException {
176 		out.writeInt(id);
177 		out.writeUTF(topicName);
178 		out.writeUTF(publisherId);
179 		out.writeByteArray(message);
180 		out.writeInt(qos != null ? qos.value() : Byte.MIN_VALUE);
181 		out.writeBoolean(isRetain);
182 	}
183 
184 	@Override
185 	public void readData(ObjectDataInput in) throws IOException {
186 		id = in.readInt();
187 		topicName = in.readUTF();
188 		publisherId = in.readUTF();
189 		message = in.readByteArray();
190 
191 		int rawInt = in.readInt();
192 		qos = rawInt != Byte.MIN_VALUE ? MqttQoS.valueOf(rawInt) : null;
193 
194 		isRetain = in.readBoolean();
195 	}
196 
197 	public static Message newMessage(String clientId, MqttPublishMessage published) {
198 		return new Message(published.variableHeader().messageId(), published.variableHeader().topicName(), clientId,
199 				NettyUtil.copy(published.payload()), published.fixedHeader().qosLevel(),
200 				published.fixedHeader().isRetain());
201 	}
202 }