View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister;
18  
19  import java.lang.management.ManagementFactory;
20  import java.text.DecimalFormat;
21  import java.text.NumberFormat;
22  import java.util.Date;
23  import java.util.concurrent.locks.Lock;
24  
25  import com.fasterxml.jackson.databind.annotation.JsonSerialize;
26  import com.google.common.collect.Maps;
27  
28  import net.anyflow.lannister.cluster.ClusterDataFactory;
29  import net.anyflow.lannister.cluster.Map;
30  import net.anyflow.lannister.message.InboundMessageStatus;
31  import net.anyflow.lannister.message.Message;
32  import net.anyflow.lannister.message.MessageReferenceCounts;
33  import net.anyflow.lannister.message.OutboundMessageStatus;
34  import net.anyflow.lannister.serialization.SysValueSerializer;
35  import net.anyflow.lannister.session.Session;
36  import net.anyflow.lannister.topic.Topic;
37  import net.anyflow.lannister.topic.TopicSubscriber;
38  import net.anyflow.lannister.topic.TopicSubscription;
39  
40  public class Statistics {
41  
42  	public static final Statistics INSTANCE = new Statistics();
43  	private static final int CENT = 100;
44  
45  	private java.util.Map<String, SysValue> data; // key : topic name
46  	private Map<Criterion, Long> criterions;
47  	private final Runtime runtime;
48  	private final DecimalFormat decimalFormatter;
49  	private final NumberFormat defaultFormatter;
50  	private final com.sun.management.OperatingSystemMXBean osBean;
51  
52  	@FunctionalInterface
53  	@JsonSerialize(using = SysValueSerializer.class)
54  	public interface SysValue {
55  		String value();
56  	}
57  
58  	private class RawSysValue implements SysValue {
59  		private Criterion criterion;
60  
61  		private RawSysValue(Criterion criterion) {
62  			this.criterion = criterion;
63  		}
64  
65  		@Override
66  		public String value() {
67  			return NumberFormat.getNumberInstance().format(criterions.get(criterion));
68  		}
69  	}
70  
71  	public enum Criterion {
72  		BROKER_START_TIME,
73  		BYTE_RECEIVED,
74  		BYTE_SENT,
75  		CLIENTS_MAXIMUM,
76  		MESSAGES_RECEIVED,
77  		MESSAGES_SENT,
78  		MESSAGES_PUBLISH_DROPPED,
79  		MESSAGES_PUBLISH_RECEIVED,
80  		MESSAGES_PUBLISH_SENT;
81  	}
82  
83  	private Statistics() {
84  		this.data = Maps.newHashMap();
85  		this.criterions = ClusterDataFactory.INSTANCE.createMap("statistics");
86  		this.runtime = Runtime.getRuntime();
87  		this.osBean = (com.sun.management.OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
88  		this.decimalFormatter = new DecimalFormat("#,###.0");
89  		this.defaultFormatter = NumberFormat.getNumberInstance();
90  
91  		initializeCriterions();
92  
93  		initializeTopics();
94  	}
95  
96  	public java.util.Map<String, SysValue> data() {
97  		return data;
98  	}
99  
100 	public void add(Criterion criterion, long size) {
101 		Lock lock = ClusterDataFactory.INSTANCE.createLock(criterion.toString());
102 
103 		lock.lock();
104 		try {
105 			Long val = criterions.get(criterion);
106 			val += size;
107 
108 			criterions.put(criterion, val);
109 		}
110 		finally {
111 			lock.unlock();
112 		}
113 	}
114 
115 	public void setMaxActiveClients(long current) {
116 		Lock lock = ClusterDataFactory.INSTANCE.createLock(Criterion.CLIENTS_MAXIMUM.toString());
117 
118 		lock.lock();
119 		try {
120 			Long prev = criterions.get(Criterion.CLIENTS_MAXIMUM);
121 
122 			if (prev < current) {
123 				criterions.put(Criterion.CLIENTS_MAXIMUM, current);
124 			}
125 		}
126 		finally {
127 			lock.unlock();
128 		}
129 	}
130 
131 	private void initializeCriterions() {
132 		if (criterions.get(Criterion.BROKER_START_TIME) == null) {
133 			criterions.put(Criterion.BROKER_START_TIME, new Date().getTime());
134 		}
135 
136 		initialize(Criterion.BYTE_RECEIVED);
137 		initialize(Criterion.BYTE_SENT);
138 		initialize(Criterion.CLIENTS_MAXIMUM);
139 		initialize(Criterion.MESSAGES_RECEIVED);
140 		initialize(Criterion.MESSAGES_SENT);
141 		initialize(Criterion.MESSAGES_PUBLISH_DROPPED);
142 		initialize(Criterion.MESSAGES_PUBLISH_RECEIVED);
143 		initialize(Criterion.MESSAGES_PUBLISH_SENT);
144 	}
145 
146 	private void initialize(Criterion criterion) {
147 		if (criterions.get(criterion) != null) { return; }
148 
149 		criterions.put(criterion, 0l);
150 	}
151 
152 	private void initializeTopics() {
153 		// MESSAGE
154 		data.put("$SYS/broker/messages/received", new RawSysValue(Criterion.MESSAGES_RECEIVED));
155 		data.put("$SYS/broker/messages/sent", new RawSysValue(Criterion.MESSAGES_SENT));
156 		data.put("$SYS/broker/messages/publish/dropped", new RawSysValue(Criterion.MESSAGES_PUBLISH_DROPPED));
157 		data.put("$SYS/broker/messages/publish/received", new RawSysValue(Criterion.MESSAGES_PUBLISH_RECEIVED));
158 		data.put("$SYS/broker/messages/publish/sent", new RawSysValue(Criterion.MESSAGES_PUBLISH_SENT));
159 		data.put("$SYS/broker/messages/retained/count", () -> defaultFormatter.format(
160 				Topic.NEXUS.keySet().stream().filter(t -> Topic.NEXUS.get(t).retainedMessage() != null).count()));
161 
162 		// CLIENT
163 		data.put("$SYS/broker/clients/maximum", new RawSysValue(Criterion.CLIENTS_MAXIMUM));
164 		data.put("$SYS/broker/clients/connected", () -> {
165 			long current = Session.NEXUS.keySet().stream().filter(s -> Session.NEXUS.get(s).isConnected(false)).count();
166 
167 			setMaxActiveClients(current);
168 
169 			return defaultFormatter.format(current);
170 		});
171 		data.put("$SYS/broker/clients/disconnected", () -> defaultFormatter
172 				.format(Session.NEXUS.keySet().stream().filter(s -> !Session.NEXUS.get(s).isConnected(false)).count()));
173 		data.put("$SYS/broker/clients/total", () -> defaultFormatter.format(Session.NEXUS.keySet().size()));
174 
175 		// STATIC
176 		data.put("$SYS/broker/version", () -> Settings.INSTANCE.version());
177 		data.put("$SYS/broker/timestamp", () -> Settings.INSTANCE.buildTime());
178 		data.put("$SYS/broker/changeset",
179 				() -> Settings.INSTANCE.commitId() + " | " + Settings.INSTANCE.commitIdDescribe());
180 
181 		// ETC
182 		data.put("$SYS/broker/load/bytes/received", new RawSysValue(Criterion.BYTE_RECEIVED));
183 		data.put("$SYS/broker/load/bytes/sent", new RawSysValue(Criterion.BYTE_SENT));
184 		data.put("$SYS/broker/subscriptions/count", () -> defaultFormatter.format(TopicSubscription.NEXUS.size()));
185 		data.put("$SYS/broker/time", () -> new Date().toString());
186 		data.put("$SYS/broker/uptime", () -> defaultFormatter
187 				.format((double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME)) / (double) 1000));
188 
189 		// RATE
190 		data.put("$SYS/broker/messages/received/inSecond", () -> {
191 			Long numerator = criterions.get(Criterion.MESSAGES_RECEIVED);
192 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
193 					/ (double) 1000;
194 
195 			return decimalFormatter.format((double) numerator / denominator);
196 		});
197 		data.put("$SYS/broker/messages/sent/inSecond", () -> {
198 			Long numerator = criterions.get(Criterion.MESSAGES_SENT);
199 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
200 					/ (double) 1000;
201 
202 			return decimalFormatter.format((double) numerator / denominator);
203 		});
204 		data.put("$SYS/broker/load/bytes/received/inSecond", () -> {
205 			Long numerator = criterions.get(Criterion.BYTE_RECEIVED);
206 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
207 					/ (double) 1000;
208 
209 			return decimalFormatter.format((double) numerator / denominator);
210 		});
211 		data.put("$SYS/broker/load/bytes/sent/inSecond", () -> {
212 			Long numerator = criterions.get(Criterion.BYTE_SENT);
213 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
214 					/ (double) 1000;
215 
216 			return decimalFormatter.format((double) numerator / denominator);
217 		});
218 		data.put("$SYS/broker/messages/publish/dropped/inSecond", () -> {
219 			Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_DROPPED);
220 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
221 					/ (double) 1000;
222 
223 			return decimalFormatter.format((double) numerator / denominator);
224 		});
225 		data.put("$SYS/broker/messages/publish/received/inSecond", () -> {
226 			Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_RECEIVED);
227 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
228 					/ (double) 1000;
229 
230 			return decimalFormatter.format((double) numerator / denominator);
231 		});
232 		data.put("$SYS/broker/messages/publish/sent/inSecond", () -> {
233 			Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_SENT);
234 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
235 					/ (double) 1000;
236 
237 			return decimalFormatter.format((double) numerator / denominator);
238 		});
239 		data.put("$SYS/broker/messages/publish/sent/inSecond", () -> {
240 			Long numerator = criterions.get(Criterion.MESSAGES_PUBLISH_SENT);
241 			Double denominator = (double) (new Date().getTime() - criterions.get(Criterion.BROKER_START_TIME))
242 					/ (double) 1000;
243 
244 			return decimalFormatter.format((double) numerator / denominator);
245 		});
246 
247 		// CUSTOM : SYSTEM
248 		data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/system/processor/count",
249 				() -> defaultFormatter.format(Runtime.getRuntime().availableProcessors()));
250 		data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/cpu/system/percent",
251 				() -> decimalFormatter.format(osBean.getSystemCpuLoad() * CENT));
252 		data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/cpu/jvm/percent",
253 				() -> decimalFormatter.format(osBean.getProcessCpuLoad() * CENT));
254 		data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/thread/active/count",
255 				() -> defaultFormatter.format(java.lang.Thread.activeCount()));
256 		data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/max/byte",
257 				() -> defaultFormatter.format(runtime.maxMemory()));
258 		data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/total/byte",
259 				() -> defaultFormatter.format(runtime.totalMemory()));
260 		data.put("$SYS/broker/node/" + ClusterDataFactory.INSTANCE.currentId() + "/load/memory/free/byte",
261 				() -> defaultFormatter.format(runtime.freeMemory()));
262 
263 		// CUSTOM : DATA STRUCTURE STATISTICS
264 		data.put("$SYS/broker/topic/count", () -> defaultFormatter.format(Topic.NEXUS.keySet().size()));
265 		data.put("$SYS/broker/topic/subscriber/count", () -> defaultFormatter.format(TopicSubscriber.NEXUS.size()));
266 		data.put("$SYS/broker/topic/subscription/count", () -> defaultFormatter.format(TopicSubscription.NEXUS.size()));
267 		data.put("$SYS/broker/topic/filter/count",
268 				() -> defaultFormatter.format(TopicSubscription.NEXUS.topicFilters().size()));
269 		data.put("$SYS/broker/messages/persisted/count", () -> defaultFormatter.format(Message.NEXUS.size()));
270 		data.put("$SYS/broker/messages/persisted/inboundStatus/count",
271 				() -> defaultFormatter.format(InboundMessageStatus.NEXUS.size()));
272 		data.put("$SYS/broker/messages/persisted/outboundStatus/count",
273 				() -> defaultFormatter.format(OutboundMessageStatus.NEXUS.size()));
274 		data.put("$SYS/broker/messages/persisted/referenceCounter/count",
275 				() -> defaultFormatter.format(MessageReferenceCounts.INSTANCE.size()));
276 	}
277 }