1 package net.anyflow.lannister.server;
2
3 import io.netty.channel.ChannelInitializer;
4 import io.netty.channel.socket.SocketChannel;
5 import io.netty.handler.codec.http.HttpContentCompressor;
6 import io.netty.handler.codec.http.HttpObjectAggregator;
7 import io.netty.handler.codec.http.HttpServerCodec;
8 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
9 import io.netty.handler.codec.mqtt.MqttDecoder;
10 import io.netty.handler.codec.mqtt.MqttEncoder;
11 import io.netty.handler.logging.LogLevel;
12 import io.netty.handler.logging.LoggingHandler;
13 import io.netty.handler.ssl.SslContext;
14 import io.netty.handler.ssl.SslContextBuilder;
15 import net.anyflow.lannister.Settings;
16 import net.anyflow.lannister.packetreceiver.ConnectReceiver;
17 import net.anyflow.lannister.packetreceiver.GenericReceiver;
18 import net.anyflow.lannister.packetreceiver.PubAckReceiver;
19 import net.anyflow.lannister.packetreceiver.PublishReceiver;
20 import net.anyflow.lannister.packetreceiver.SubscribeReceiver;
21 import net.anyflow.lannister.packetreceiver.UnsubscribeReceiver;
22
23 public class MqttChannelInitializer extends ChannelInitializer<SocketChannel> {
24
25 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttChannelInitializer.class);
26
27 private final boolean useWebSocket;
28 private final boolean useSsl;
29
30 public MqttChannelInitializer(boolean useWebSocket, boolean useSsl) {
31 this.useWebSocket = useWebSocket;
32 this.useSsl = useSsl;
33 }
34
35 @Override
36 protected void initChannel(SocketChannel ch) throws Exception {
37 logger.debug("Initializaing channels...");
38
39 ch.pipeline().addLast(ByteCounterCodec.class.getName(), new ByteCounterCodec());
40
41 if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) {
42 ch.pipeline().addLast(LoggingHandler.class.getName(), new LoggingHandler(LogLevel.DEBUG));
43 }
44
45 if (useSsl) {
46 SslContext sslCtx = SslContextBuilder
47 .forServer(Settings.INSTANCE.certChainFile(), Settings.INSTANCE.privateKeyFile()).build();
48
49 logger.debug("SSL Provider : {}", SslContext.defaultServerProvider());
50
51 ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
52 }
53
54 if (useWebSocket) {
55 String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path", "/");
56
57 ch.pipeline().addLast(HttpServerCodec.class.getName(), new HttpServerCodec());
58 ch.pipeline().addLast(HttpObjectAggregator.class.getName(), new HttpObjectAggregator(1048576));
59 ch.pipeline().addLast(HttpContentCompressor.class.getName(), new HttpContentCompressor());
60 ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(),
61 new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536));
62 ch.pipeline().addLast(new MqttWebSocketCodec());
63 }
64
65 int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage", 8092);
66
67 ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder(maxBytesInMessage));
68 ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE);
69
70 ch.pipeline().addLast(ConnectReceiver.class.getName(), ConnectReceiver.INSTANCE);
71 ch.pipeline().addLast(PubAckReceiver.class.getName(), PubAckReceiver.INSTANCE);
72 ch.pipeline().addLast(PublishReceiver.class.getName(), PublishReceiver.INSTANCE);
73 ch.pipeline().addLast(SubscribeReceiver.class.getName(), SubscribeReceiver.INSTANCE);
74 ch.pipeline().addLast(UnsubscribeReceiver.class.getName(), UnsubscribeReceiver.INSTANCE);
75 ch.pipeline().addLast(GenericReceiver.class.getName(), GenericReceiver.INSTANCE);
76 }
77 }