1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.server;
18
19 import java.util.List;
20 import java.util.concurrent.TimeUnit;
21
22 import com.google.common.collect.Lists;
23
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInboundHandlerAdapter;
26 import io.netty.handler.codec.mqtt.MqttQoS;
27 import io.netty.util.CharsetUtil;
28 import net.anyflow.lannister.Settings;
29 import net.anyflow.lannister.Statistics;
30 import net.anyflow.lannister.cluster.ClusterDataFactory;
31 import net.anyflow.lannister.message.Message;
32 import net.anyflow.lannister.session.Session;
33 import net.anyflow.lannister.topic.Topic;
34 import net.anyflow.lannister.topic.TopicSubscriber;
35
36 public class ScheduledExecutor extends ChannelInboundHandlerAdapter {
37 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScheduledExecutor.class);
38
39 @Override
40 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
41 int $sysPublisherInterval = Settings.INSTANCE.getInt("mqttserver.sys.intervalSeconds", 2);
42 ctx.executor().scheduleAtFixedRate(new $SysPublisher(), 0, $sysPublisherInterval, TimeUnit.SECONDS);
43
44 int sessionExpiratorInterval = Settings.INSTANCE
45 .getInt("mqttserver.sessionExpirationHandlerExecutionIntervalSeconds", 0);
46 ctx.executor().scheduleAtFixedRate(new SessionExpirator(), 0, sessionExpiratorInterval, TimeUnit.SECONDS);
47 }
48
49 static class $SysPublisher implements Runnable {
50 @Override
51 public void run() {
52 Statistics.INSTANCE.data().entrySet().stream().forEach(e -> {
53 if (TopicSubscriber.NEXUS.clientIdsOf(e.getKey()).size() <= 0) { return; }
54
55 byte[] msg = e.getValue().value().getBytes(CharsetUtil.UTF_8);
56
57 Message message = new Message(-1, e.getKey(), ClusterDataFactory.INSTANCE.currentId(), msg,
58 MqttQoS.AT_MOST_ONCE, false);
59
60 Topic.NEXUS.prepare(message).publish(message);
61 });
62 }
63 }
64
65 static class SessionExpirator implements Runnable {
66 @Override
67 public void run() {
68 List<Session> disposes = Lists.newArrayList();
69
70 Session.NEXUS.ctxs().keySet().stream().filter(id -> {
71 Session s = Session.NEXUS.get(id);
72 return s.isExpired();
73 }).forEach(id -> disposes.add(Session.NEXUS.get(id)));
74
75 disposes.stream().forEach(s -> s.dispose(true));
76
77 if (disposes.size() > 0) {
78 logger.debug("SessionExpirationHandler executed [dispose count={}]", disposes.size());
79 }
80 }
81 }
82 }