View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.server;
18  
19  import java.util.List;
20  import java.util.concurrent.TimeUnit;
21  
22  import com.google.common.collect.Lists;
23  
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.handler.codec.mqtt.MqttQoS;
27  import io.netty.util.CharsetUtil;
28  import net.anyflow.lannister.Settings;
29  import net.anyflow.lannister.Statistics;
30  import net.anyflow.lannister.cluster.ClusterDataFactory;
31  import net.anyflow.lannister.message.Message;
32  import net.anyflow.lannister.session.Session;
33  import net.anyflow.lannister.topic.Topic;
34  import net.anyflow.lannister.topic.TopicSubscriber;
35  
36  public class ScheduledExecutor extends ChannelInboundHandlerAdapter {
37  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScheduledExecutor.class);
38  
39  	@Override
40  	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
41  		int $sysPublisherInterval = Settings.INSTANCE.getInt("mqttserver.sys.intervalSeconds", 2);
42  		ctx.executor().scheduleAtFixedRate(new $SysPublisher(), 0, $sysPublisherInterval, TimeUnit.SECONDS);
43  
44  		int sessionExpiratorInterval = Settings.INSTANCE
45  				.getInt("mqttserver.sessionExpirationHandlerExecutionIntervalSeconds", 0);
46  		ctx.executor().scheduleAtFixedRate(new SessionExpirator(), 0, sessionExpiratorInterval, TimeUnit.SECONDS);
47  	}
48  
49  	static class $SysPublisher implements Runnable {
50  		@Override
51  		public void run() {
52  			Statistics.INSTANCE.data().entrySet().stream().forEach(e -> {
53  				if (TopicSubscriber.NEXUS.clientIdsOf(e.getKey()).size() <= 0) { return; }
54  
55  				byte[] msg = e.getValue().value().getBytes(CharsetUtil.UTF_8);
56  
57  				Message message = new Message(-1, e.getKey(), ClusterDataFactory.INSTANCE.currentId(), msg,
58  						MqttQoS.AT_MOST_ONCE, false);
59  
60  				Topic.NEXUS.prepare(message).publish(message);
61  			});
62  		}
63  	}
64  
65  	static class SessionExpirator implements Runnable {
66  		@Override
67  		public void run() {
68  			List<Session> disposes = Lists.newArrayList();
69  
70  			Session.NEXUS.ctxs().keySet().stream().filter(id -> {
71  				Session s = Session.NEXUS.get(id);
72  				return s.isExpired();
73  			}).forEach(id -> disposes.add(Session.NEXUS.get(id)));
74  
75  			disposes.stream().forEach(s -> s.dispose(true)); // [MQTT-3.1.2-24]
76  
77  			if (disposes.size() > 0) {
78  				logger.debug("SessionExpirationHandler executed [dispose count={}]", disposes.size());
79  			}
80  		}
81  	}
82  }