Statistics.java
/*
* Copyright 2016 The Lannister Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.anyflow.lannister;
import java.lang.management.ManagementFactory;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.Maps;
import net.anyflow.lannister.cluster.ClusterDataFactory;
import net.anyflow.lannister.cluster.Map;
import net.anyflow.lannister.message.InboundMessageStatus;
import net.anyflow.lannister.message.Message;
import net.anyflow.lannister.message.MessageReferenceCounts;
import net.anyflow.lannister.message.OutboundMessageStatus;
import net.anyflow.lannister.serialization.SysValueSerializer;
import net.anyflow.lannister.session.Session;
import net.anyflow.lannister.topic.Topic;
import net.anyflow.lannister.topic.TopicSubscriber;
import net.anyflow.lannister.topic.TopicSubscription;
public class Statistics {
public static final Statistics INSTANCE = new Statistics();
private static final int CENT = 100;
private java.util.Map<String, SysValue> data; // key : topic name
private Map<Criterion, Long> criterions;
private final Runtime runtime;
private final DecimalFormat decimalFormatter;
private final NumberFormat defaultFormatter;
private final com.sun.management.OperatingSystemMXBean osBean;
@FunctionalInterface
@JsonSerialize(using = SysValueSerializer.class)
public interface SysValue {
String value();
}
private class RawSysValue implements SysValue {
private Criterion criterion;
private RawSysValue(Criterion criterion) {
this.criterion = criterion;
}
@Override
public String value() {
return NumberFormat.getNumberInstance().format(criterions.get(criterion));
}
}
public enum Criterion {
BROKER_START_TIME,
BYTE_RECEIVED,
BYTE_SENT,
CLIENTS_MAXIMUM,
MESSAGES_RECEIVED,
MESSAGES_SENT,
MESSAGES_PUBLISH_DROPPED,
MESSAGES_PUBLISH_RECEIVED,
MESSAGES_PUBLISH_SENT;
}
private Statistics() {
this.data = Maps.newHashMap();
this.criterions = ClusterDataFactory.INSTANCE.createMap("statistics");
this.runtime = Runtime.getRuntime();
this.osBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
this.decimalFormatter = new DecimalFormat("#,###.0");
this.defaultFormatter = NumberFormat.getNumberInstance();
initializeCriterions();
initializeTopics();
}
public java.util.Map<String, SysValue> data() {
return data;
}
public void add(Criterion criterion, long size) {
Lock lock = ClusterDataFactory.INSTANCE.createLock(criterion.toString());
lock.lock();
try {
Long val = criterions.get(criterion);
val += size;
criterions.put(criterion, val);
}
finally {
lock.unlock();
}
}
public void setMaxActiveClients(long current) {
Lock lock = ClusterDataFactory.INSTANCE.createLock(Criterion.CLIENTS_MAXIMUM.toString());
lock.lock();
try {
Long prev = criterions.get(Criterion.CLIENTS_MAXIMUM);
if (prev < current) {
criterions.put(Criterion.CLIENTS_MAXIMUM, current);
}
}
finally {
lock.unlock();
}
}
private void initializeCriterions() {
if (criterions.get(Criterion.BROKER_START_TIME) == null) {
criterions.put(Criterion.BROKER_START_TIME, new Date().getTime());
}
initialize(Criterion.BYTE_RECEIVED);
initialize(Criterion.BYTE_SENT);
initialize(Criterion.CLIENTS_MAXIMUM);
initialize(Criterion.MESSAGES_RECEIVED);
initialize(Criterion.MESSAGES_SENT);
initialize(Criterion.MESSAGES_PUBLISH_DROPPED);
initialize(Criterion.MESSAGES_PUBLISH_RECEIVED);
initialize(Criterion.MESSAGES_PUBLISH_SENT);
}
private void initialize(Criterion criterion) {
if (criterions.get(criterion) != null) { return; }
criterions.put(criterion, 0l);
}
private void initializeTopics() {
// MESSAGE
data.put("$SYS/broker/messages/received", new RawSysValue(Criterion.MESSAGES_RECEIVED));
data.put("$SYS/broker/messages/sent", new RawSysValue(Criterion.MESSAGES_SENT));
data.put("$SYS/broker/messages/publish/dropped", new RawSysValue(Criterion.MESSAGES_PUBLISH_DROPPED));
data.put("$SYS/broker/messages/publish/received", new RawSysValue(Criterion.MESSAGES_PUBLISH_RECEIVED));
data.put("$SYS/broker/messages/publish/sent", new RawSysValue(Criterion.MESSAGES_PUBLISH_SENT));
data.put("$SYS/broker/messages/retained/count", () -> defaultFormatter.format(
Topic.NEXUS.keySet().stream().filter(t -> Topic.NEXUS.get(t).retainedMessage() != null).count()));
// CLIENT
data.put("$SYS/broker/clients/maximum", new RawSysValue(Criterion.CLIENTS_MAXIMUM));
data.put("$SYS/broker/clients/connected", () -> {
long current = Session.NEXUS.keySet().stream().filter(s -> Session.NEXUS.get(s).isConnected(false)).count();
setMaxActiveClients(current);
return defaultFormatter.format(current);
});
data.put("$SYS/broker/clients/disconnected", () -> defaultFormatter
.format(Session.NEXUS.keySet().stream().filter(s -> !Session.NEXUS.get(s).isConnected(false)).count()));
data.put("$SYS/broker/clients/total", () -> defaultFormatter.format(Session.NEXUS.keySet().size()));
// STATIC
data.put("$SYS/broker/version", () -> Settings.INSTANCE.version());
data.put("$SYS/broker/timestamp", () -> Settings.INSTANCE.buildTime());
data.put("$SYS/broker/changeset",
() -> Settings.INSTANCE.commitId() + " | " + Settings.INSTANCE.commitIdDescribe());
// ETC
data.put("$SYS/broker/load/bytes/received", new RawSysValue(Criterion.BYTE_RECEIVED));
data.put("$SYS/broker/load/bytes/sent", new RawSysValue(Criterion.BYTE_SENT));
data.put("$SYS/broker/subscriptions/count", () -> defaultFormatter.format(TopicSubscription.NEXUS.size()));
data.put("$SYS/broker/time", () -> new Date().toString());
data.put("$SYS/broker/uptime", () -> defaultFormatter
.format((double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME)) / (double) 1000));
// RATE
data.put("$SYS/broker/messages/received/inSecond", () -> {
Long numerator = criterions.get(Criterion.MESSAGES_RECEIVED);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
data.put("$SYS/broker/messages/sent/inSecond", () -> {
Long numerator = criterions.get(Criterion.MESSAGES_SENT);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
data.put("$SYS/broker/load/bytes/received/inSecond", () -> {
Long numerator = criterions.get(Criterion.BYTE_RECEIVED);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
data.put("$SYS/broker/load/bytes/sent/inSecond", () -> {
Long numerator = criterions.get(Criterion.BYTE_SENT);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
data.put("$SYS/broker/messages/publish/dropped/inSecond", () -> {
Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_DROPPED);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
data.put("$SYS/broker/messages/publish/received/inSecond", () -> {
Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_RECEIVED);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
data.put("$SYS/broker/messages/publish/sent/inSecond", () -> {
Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_SENT);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
data.put("$SYS/broker/messages/publish/sent/inSecond", () -> {
Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_SENT);
Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
/ (double) 1000;
return decimalFormatter.format((double) numerator / denominator);
});
// CUSTOM : SYSTEM
data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/system/processor/count",
() -> defaultFormatter.format(Runtime.getRuntime().availableProcessors()));
data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/cpu/system/percent",
() -> decimalFormatter.format(osBean.getSystemCpuLoad() * CENT));
data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/cpu/jvm/percent",
() -> decimalFormatter.format(osBean.getProcessCpuLoad() * CENT));
data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/thread/active/count",
() -> defaultFormatter.format(java.lang.Thread.activeCount()));
data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/max/byte",
() -> defaultFormatter.format(runtime.maxMemory()));
data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/total/byte",
() -> defaultFormatter.format(runtime.totalMemory()));
data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/free/byte",
() -> defaultFormatter.format(runtime.freeMemory()));
// CUSTOM : DATA STRUCTURE STATISTICS
data.put("$SYS/broker/topic/count", () -> defaultFormatter.format(Topic.NEXUS.keySet().size()));
data.put("$SYS/broker/topic/subscriber/count", () -> defaultFormatter.format(TopicSubscriber.NEXUS.size()));
data.put("$SYS/broker/topic/subscription/count", () -> defaultFormatter.format(TopicSubscription.NEXUS.size()));
data.put("$SYS/broker/topic/filter/count",
() -> defaultFormatter.format(TopicSubscription.NEXUS.topicFilters().size()));
data.put("$SYS/broker/messages/persisted/count", () -> defaultFormatter.format(Message.NEXUS.size()));
data.put("$SYS/broker/messages/persisted/inboundStatus/count",
() -> defaultFormatter.format(InboundMessageStatus.NEXUS.size()));
data.put("$SYS/broker/messages/persisted/outboundStatus/count",
() -> defaultFormatter.format(OutboundMessageStatus.NEXUS.size()));
data.put("$SYS/broker/messages/persisted/referenceCounter/count",
() -> defaultFormatter.format(MessageReferenceCounts.INSTANCE.size()));
}
}