1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister;
18
19 import java.lang.management.ManagementFactory;
20 import java.text.DecimalFormat;
21 import java.text.NumberFormat;
22 import java.util.Date;
23 import java.util.concurrent.locks.Lock;
24
25 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
26 import com.google.common.collect.Maps;
27
28 import net.anyflow.lannister.cluster.ClusterDataFactory;
29 import net.anyflow.lannister.cluster.Map;
30 import net.anyflow.lannister.message.InboundMessageStatus;
31 import net.anyflow.lannister.message.Message;
32 import net.anyflow.lannister.message.MessageReferenceCounts;
33 import net.anyflow.lannister.message.OutboundMessageStatus;
34 import net.anyflow.lannister.serialization.SysValueSerializer;
35 import net.anyflow.lannister.session.Session;
36 import net.anyflow.lannister.topic.Topic;
37 import net.anyflow.lannister.topic.TopicSubscriber;
38 import net.anyflow.lannister.topic.TopicSubscription;
39
40 public class Statistics {
41
42 public static final Statistics INSTANCE = new Statistics();
43 private static final int CENT = 100;
44
45 private java.util.Map<String, SysValue> data;
46 private Map<Criterion, Long> criterions;
47 private final Runtime runtime;
48 private final DecimalFormat decimalFormatter;
49 private final NumberFormat defaultFormatter;
50 private final com.sun.management.OperatingSystemMXBean osBean;
51
52 @FunctionalInterface
53 @JsonSerialize(using = SysValueSerializer.class)
54 public interface SysValue {
55 String value();
56 }
57
58 private class RawSysValue implements SysValue {
59 private Criterion criterion;
60
61 private RawSysValue(Criterion criterion) {
62 this.criterion = criterion;
63 }
64
65 @Override
66 public String value() {
67 return NumberFormat.getNumberInstance().format(criterions.get(criterion));
68 }
69 }
70
71 public enum Criterion {
72 BROKER_START_TIME,
73 BYTE_RECEIVED,
74 BYTE_SENT,
75 CLIENTS_MAXIMUM,
76 MESSAGES_RECEIVED,
77 MESSAGES_SENT,
78 MESSAGES_PUBLISH_DROPPED,
79 MESSAGES_PUBLISH_RECEIVED,
80 MESSAGES_PUBLISH_SENT;
81 }
82
83 private Statistics() {
84 this.data = Maps.newHashMap();
85 this.criterions = ClusterDataFactory.INSTANCE.createMap("statistics");
86 this.runtime = Runtime.getRuntime();
87 this.osBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
88 this.decimalFormatter = new DecimalFormat("#,###.0");
89 this.defaultFormatter = NumberFormat.getNumberInstance();
90
91 initializeCriterions();
92
93 initializeTopics();
94 }
95
96 public java.util.Map<String, SysValue> data() {
97 return data;
98 }
99
100 public void add(Criterion criterion, long size) {
101 Lock lock = ClusterDataFactory.INSTANCE.createLock(criterion.toString());
102
103 lock.lock();
104 try {
105 Long val = criterions.get(criterion);
106 val += size;
107
108 criterions.put(criterion, val);
109 }
110 finally {
111 lock.unlock();
112 }
113 }
114
115 public void setMaxActiveClients(long current) {
116 Lock lock = ClusterDataFactory.INSTANCE.createLock(Criterion.CLIENTS_MAXIMUM.toString());
117
118 lock.lock();
119 try {
120 Long prev = criterions.get(Criterion.CLIENTS_MAXIMUM);
121
122 if (prev < current) {
123 criterions.put(Criterion.CLIENTS_MAXIMUM, current);
124 }
125 }
126 finally {
127 lock.unlock();
128 }
129 }
130
131 private void initializeCriterions() {
132 if (criterions.get(Criterion.BROKER_START_TIME) == null) {
133 criterions.put(Criterion.BROKER_START_TIME, new Date().getTime());
134 }
135
136 initialize(Criterion.BYTE_RECEIVED);
137 initialize(Criterion.BYTE_SENT);
138 initialize(Criterion.CLIENTS_MAXIMUM);
139 initialize(Criterion.MESSAGES_RECEIVED);
140 initialize(Criterion.MESSAGES_SENT);
141 initialize(Criterion.MESSAGES_PUBLISH_DROPPED);
142 initialize(Criterion.MESSAGES_PUBLISH_RECEIVED);
143 initialize(Criterion.MESSAGES_PUBLISH_SENT);
144 }
145
146 private void initialize(Criterion criterion) {
147 if (criterions.get(criterion) != null) { return; }
148
149 criterions.put(criterion, 0l);
150 }
151
152 private void initializeTopics() {
153
154 data.put("$SYS/broker/messages/received", new RawSysValue(Criterion.MESSAGES_RECEIVED));
155 data.put("$SYS/broker/messages/sent", new RawSysValue(Criterion.MESSAGES_SENT));
156 data.put("$SYS/broker/messages/publish/dropped", new RawSysValue(Criterion.MESSAGES_PUBLISH_DROPPED));
157 data.put("$SYS/broker/messages/publish/received", new RawSysValue(Criterion.MESSAGES_PUBLISH_RECEIVED));
158 data.put("$SYS/broker/messages/publish/sent", new RawSysValue(Criterion.MESSAGES_PUBLISH_SENT));
159 data.put("$SYS/broker/messages/retained/count", () -> defaultFormatter.format(
160 Topic.NEXUS.keySet().stream().filter(t -> Topic.NEXUS.get(t).retainedMessage() != null).count()));
161
162
163 data.put("$SYS/broker/clients/maximum", new RawSysValue(Criterion.CLIENTS_MAXIMUM));
164 data.put("$SYS/broker/clients/connected", () -> {
165 long current = Session.NEXUS.keySet().stream().filter(s -> Session.NEXUS.get(s).isConnected(false)).count();
166
167 setMaxActiveClients(current);
168
169 return defaultFormatter.format(current);
170 });
171 data.put("$SYS/broker/clients/disconnected", () -> defaultFormatter
172 .format(Session.NEXUS.keySet().stream().filter(s -> !Session.NEXUS.get(s).isConnected(false)).count()));
173 data.put("$SYS/broker/clients/total", () -> defaultFormatter.format(Session.NEXUS.keySet().size()));
174
175
176 data.put("$SYS/broker/version", () -> Settings.INSTANCE.version());
177 data.put("$SYS/broker/timestamp", () -> Settings.INSTANCE.buildTime());
178 data.put("$SYS/broker/changeset",
179 () -> Settings.INSTANCE.commitId() + " | " + Settings.INSTANCE.commitIdDescribe());
180
181
182 data.put("$SYS/broker/load/bytes/received", new RawSysValue(Criterion.BYTE_RECEIVED));
183 data.put("$SYS/broker/load/bytes/sent", new RawSysValue(Criterion.BYTE_SENT));
184 data.put("$SYS/broker/subscriptions/count", () -> defaultFormatter.format(TopicSubscription.NEXUS.size()));
185 data.put("$SYS/broker/time", () -> new Date().toString());
186 data.put("$SYS/broker/uptime", () -> defaultFormatter
187 .format((double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME)) / (double) 1000));
188
189
190 data.put("$SYS/broker/messages/received/inSecond", () -> {
191 Long numerator = criterions.get(Criterion.MESSAGES_RECEIVED);
192 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
193 / (double) 1000;
194
195 return decimalFormatter.format((double) numerator / denominator);
196 });
197 data.put("$SYS/broker/messages/sent/inSecond", () -> {
198 Long numerator = criterions.get(Criterion.MESSAGES_SENT);
199 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
200 / (double) 1000;
201
202 return decimalFormatter.format((double) numerator / denominator);
203 });
204 data.put("$SYS/broker/load/bytes/received/inSecond", () -> {
205 Long numerator = criterions.get(Criterion.BYTE_RECEIVED);
206 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
207 / (double) 1000;
208
209 return decimalFormatter.format((double) numerator / denominator);
210 });
211 data.put("$SYS/broker/load/bytes/sent/inSecond", () -> {
212 Long numerator = criterions.get(Criterion.BYTE_SENT);
213 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
214 / (double) 1000;
215
216 return decimalFormatter.format((double) numerator / denominator);
217 });
218 data.put("$SYS/broker/messages/publish/dropped/inSecond", () -> {
219 Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_DROPPED);
220 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
221 / (double) 1000;
222
223 return decimalFormatter.format((double) numerator / denominator);
224 });
225 data.put("$SYS/broker/messages/publish/received/inSecond", () -> {
226 Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_RECEIVED);
227 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
228 / (double) 1000;
229
230 return decimalFormatter.format((double) numerator / denominator);
231 });
232 data.put("$SYS/broker/messages/publish/sent/inSecond", () -> {
233 Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_SENT);
234 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
235 / (double) 1000;
236
237 return decimalFormatter.format((double) numerator / denominator);
238 });
239 data.put("$SYS/broker/messages/publish/sent/inSecond", () -> {
240 Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_SENT);
241 Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
242 / (double) 1000;
243
244 return decimalFormatter.format((double) numerator / denominator);
245 });
246
247
248 data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/system/processor/count",
249 () -> defaultFormatter.format(Runtime.getRuntime().availableProcessors()));
250 data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/cpu/system/percent",
251 () -> decimalFormatter.format(osBean.getSystemCpuLoad() * CENT));
252 data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/cpu/jvm/percent",
253 () -> decimalFormatter.format(osBean.getProcessCpuLoad() * CENT));
254 data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/thread/active/count",
255 () -> defaultFormatter.format(java.lang.Thread.activeCount()));
256 data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/max/byte",
257 () -> defaultFormatter.format(runtime.maxMemory()));
258 data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/total/byte",
259 () -> defaultFormatter.format(runtime.totalMemory()));
260 data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/free/byte",
261 () -> defaultFormatter.format(runtime.freeMemory()));
262
263
264 data.put("$SYS/broker/topic/count", () -> defaultFormatter.format(Topic.NEXUS.keySet().size()));
265 data.put("$SYS/broker/topic/subscriber/count", () -> defaultFormatter.format(TopicSubscriber.NEXUS.size()));
266 data.put("$SYS/broker/topic/subscription/count", () -> defaultFormatter.format(TopicSubscription.NEXUS.size()));
267 data.put("$SYS/broker/topic/filter/count",
268 () -> defaultFormatter.format(TopicSubscription.NEXUS.topicFilters().size()));
269 data.put("$SYS/broker/messages/persisted/count", () -> defaultFormatter.format(Message.NEXUS.size()));
270 data.put("$SYS/broker/messages/persisted/inboundStatus/count",
271 () -> defaultFormatter.format(InboundMessageStatus.NEXUS.size()));
272 data.put("$SYS/broker/messages/persisted/outboundStatus/count",
273 () -> defaultFormatter.format(OutboundMessageStatus.NEXUS.size()));
274 data.put("$SYS/broker/messages/persisted/referenceCounter/count",
275 () -> defaultFormatter.format(MessageReferenceCounts.INSTANCE.size()));
276 }
277 }