Message.java

/*
 * Copyright 2016 The Lannister Project
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.anyflow.lannister.message;

import java.io.IOException;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;

import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.StringUtil;
import net.anyflow.lannister.NettyUtil;
import net.anyflow.lannister.plugin.IMessage;
import net.anyflow.lannister.serialization.SerializableFactory;

public class Message implements com.hazelcast.nio.serialization.IdentifiedDataSerializable, IMessage, Cloneable {
	public final static Messages NEXUS = new Messages();
	public final static int ID = 1;

	public static final int MAX_MESSAGE_ID_NUM = 0xffff;
	public static final int MIN_MESSAGE_ID_NUM = 1;

	@JsonProperty
	private int id;
	@JsonProperty
	private String topicName;
	@JsonProperty
	private String publisherId;
	@JsonProperty
	private byte[] message;
	@JsonProperty
	private MqttQoS qos;
	@JsonProperty
	private boolean isRetain;

	public Message() {
	}

	public Message(ObjectDataInput in) throws IOException {
		readData(in);
	}

	public Message(int id, String topicName, String publisherId, byte[] message, MqttQoS qos, boolean isRetain) {
		this.id = id;
		this.topicName = topicName;
		this.publisherId = publisherId;
		this.message = message != null ? message : new byte[] {};
		this.qos = qos;
		this.isRetain = isRetain;
	}

	public String key() {
		return Messages.key(publisherId, id);
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see net.anyflow.lannister.message.IMessage#id()
	 */
	@Override
	public int id() {
		return id;
	}

	public void id(int id) {
		this.id = id;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see net.anyflow.lannister.message.IMessage#topicName()
	 */
	@Override
	public String topicName() {
		return topicName;
	}

	@Override
	public String publisherId() {
		return publisherId;
	}

	public void publisherId(String publisherId) {
		this.publisherId = publisherId;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see net.anyflow.lannister.message.IMessage#message()
	 */
	@Override
	public byte[] message() {
		return message;
	}

	public void setMessage(byte[] message) {
		this.message = message != null ? message : new byte[] {};
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see net.anyflow.lannister.message.IMessage#qos()
	 */
	@Override
	public MqttQoS qos() {
		return qos;
	}

	public void qos(MqttQoS qos) {
		this.qos = qos;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see net.anyflow.lannister.message.IMessage#isRetain()
	 */
	@Override
	public boolean isRetain() {
		return isRetain;
	}

	public void setRetain(boolean isRetain) {
		this.isRetain = isRetain;
	}

	@Override
	public String toString() {
		return new StringBuilder(StringUtil.simpleClassName(this)).append('[').append("id=").append(id)
				.append(", topeName=").append(topicName).append(", message=")
				.append(new String(message, CharsetUtil.UTF_8)).append(", QoS=").append(qos).append(", retain=")
				.append(isRetain).append(']').toString();
	}

	@Override
	public Message clone() {
		return new Message(id, topicName, publisherId, message, qos, isRetain);
	}

	@JsonIgnore
	@Override
	public int getFactoryId() {
		return SerializableFactory.ID;
	}

	@JsonIgnore
	@Override
	public int getId() {
		return ID;
	}

	@Override
	public void writeData(ObjectDataOutput out) throws IOException {
		out.writeInt(id);
		out.writeUTF(topicName);
		out.writeUTF(publisherId);
		out.writeByteArray(message);
		out.writeInt(qos != null ? qos.value() : Byte.MIN_VALUE);
		out.writeBoolean(isRetain);
	}

	@Override
	public void readData(ObjectDataInput in) throws IOException {
		id = in.readInt();
		topicName = in.readUTF();
		publisherId = in.readUTF();
		message = in.readByteArray();

		int rawInt = in.readInt();
		qos = rawInt != Byte.MIN_VALUE ? MqttQoS.valueOf(rawInt) : null;

		isRetain = in.readBoolean();
	}

	public static Message newMessage(String clientId, MqttPublishMessage published) {
		return new Message(published.variableHeader().messageId(), published.variableHeader().topicName(), clientId,
				NettyUtil.copy(published.payload()), published.fixedHeader().qosLevel(),
				published.fixedHeader().isRetain());
	}
}