1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.message;
18
19 import java.io.IOException;
20
21 import com.fasterxml.jackson.annotation.JsonIgnore;
22 import com.fasterxml.jackson.annotation.JsonProperty;
23 import com.hazelcast.nio.ObjectDataInput;
24 import com.hazelcast.nio.ObjectDataOutput;
25
26 import io.netty.handler.codec.mqtt.MqttPublishMessage;
27 import io.netty.handler.codec.mqtt.MqttQoS;
28 import io.netty.util.CharsetUtil;
29 import io.netty.util.internal.StringUtil;
30 import net.anyflow.lannister.NettyUtil;
31 import net.anyflow.lannister.plugin.IMessage;
32 import net.anyflow.lannister.serialization.SerializableFactory;
33
34 public class Message implements com.hazelcast.nio.serialization.IdentifiedDataSerializable, IMessage, Cloneable {
35 public final static Messages NEXUS = new Messages();
36 public final static int ID = 1;
37
38 public static final int MAX_MESSAGE_ID_NUM = 0xffff;
39 public static final int MIN_MESSAGE_ID_NUM = 1;
40
41 @JsonProperty
42 private int id;
43 @JsonProperty
44 private String topicName;
45 @JsonProperty
46 private String publisherId;
47 @JsonProperty
48 private byte[] message;
49 @JsonProperty
50 private MqttQoS qos;
51 @JsonProperty
52 private boolean isRetain;
53
54 public Message() {
55 }
56
57 public Message(ObjectDataInput in) throws IOException {
58 readData(in);
59 }
60
61 public Message(int id, String topicName, String publisherId, byte[] message, MqttQoS qos, boolean isRetain) {
62 this.id = id;
63 this.topicName = topicName;
64 this.publisherId = publisherId;
65 this.message = message != null ? message : new byte[] {};
66 this.qos = qos;
67 this.isRetain = isRetain;
68 }
69
70 public String key() {
71 return Messages.key(publisherId, id);
72 }
73
74
75
76
77
78
79 @Override
80 public int id() {
81 return id;
82 }
83
84 public void id(int id) {
85 this.id = id;
86 }
87
88
89
90
91
92
93 @Override
94 public String topicName() {
95 return topicName;
96 }
97
98 @Override
99 public String publisherId() {
100 return publisherId;
101 }
102
103 public void publisherId(String publisherId) {
104 this.publisherId = publisherId;
105 }
106
107
108
109
110
111
112 @Override
113 public byte[] message() {
114 return message;
115 }
116
117 public void setMessage(byte[] message) {
118 this.message = message != null ? message : new byte[] {};
119 }
120
121
122
123
124
125
126 @Override
127 public MqttQoS qos() {
128 return qos;
129 }
130
131 public void qos(MqttQoS qos) {
132 this.qos = qos;
133 }
134
135
136
137
138
139
140 @Override
141 public boolean isRetain() {
142 return isRetain;
143 }
144
145 public void setRetain(boolean isRetain) {
146 this.isRetain = isRetain;
147 }
148
149 @Override
150 public String toString() {
151 return new StringBuilder(StringUtil.simpleClassName(this)).append('[').append("id=").append(id)
152 .append(", topeName=").append(topicName).append(", message=")
153 .append(new String(message, CharsetUtil.UTF_8)).append(", QoS=").append(qos).append(", retain=")
154 .append(isRetain).append(']').toString();
155 }
156
157 @Override
158 public Message clone() {
159 return new Message(id, topicName, publisherId, message, qos, isRetain);
160 }
161
162 @JsonIgnore
163 @Override
164 public int getFactoryId() {
165 return SerializableFactory.ID;
166 }
167
168 @JsonIgnore
169 @Override
170 public int getId() {
171 return ID;
172 }
173
174 @Override
175 public void writeData(ObjectDataOutput out) throws IOException {
176 out.writeInt(id);
177 out.writeUTF(topicName);
178 out.writeUTF(publisherId);
179 out.writeByteArray(message);
180 out.writeInt(qos != null ? qos.value() : Byte.MIN_VALUE);
181 out.writeBoolean(isRetain);
182 }
183
184 @Override
185 public void readData(ObjectDataInput in) throws IOException {
186 id = in.readInt();
187 topicName = in.readUTF();
188 publisherId = in.readUTF();
189 message = in.readByteArray();
190
191 int rawInt = in.readInt();
192 qos = rawInt != Byte.MIN_VALUE ? MqttQoS.valueOf(rawInt) : null;
193
194 isRetain = in.readBoolean();
195 }
196
197 public static Message newMessage(String clientId, MqttPublishMessage published) {
198 return new Message(published.variableHeader().messageId(), published.variableHeader().topicName(), clientId,
199 NettyUtil.copy(published.payload()), published.fixedHeader().qosLevel(),
200 published.fixedHeader().isRetain());
201 }
202 }