package com.ruoyi.netty.communication; import com.ruoyi.common.utils.netty.ApplicationContextRegister; import com.ruoyi.common.utils.netty.ConcurrentHashMapBean; import com.ruoyi.common.utils.netty.ConvertCode; import com.ruoyi.common.utils.netty.NettyTool; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; /** * @author 86175 */ @Slf4j public class OptometerServerHandler extends SimpleChannelInboundHandler { ApplicationContext act = ApplicationContextRegister.getApplicationContext(); //用于保存所有Channel对象 public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //表示连接建立 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //chanel可以理解成Connection Channel channel = ctx.channel(); //广播消息给所有的客户端 channelGroup.add(channel); } // 数据发送和接受 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception { String receiveStr = ConvertCode.bytes2Str(bytes); log.info("info 得到的数据:" + receiveStr); // 添加 CRC16校验 添加了 高位 和低位 //建立通讯通道 Channel channel = channelHandlerContext.channel(); //报文原始码 // String mangedata=receiveStr.substring(26,receiveStr.length()-6); // //序列号 // String number=receiveStr.substring(8,12); channelGroup.forEach(ch -> { //表示当前连接 就是当前硬件客户端与服务器进行的连接 if (channel == ch) { if ( !receiveStr.startsWith("aa") && receiveStr.substring(46,48).equals("85")){ //获取识别符 String Identifier = receiveStr.substring(46, 48); // 获取设备编号 String stcd = receiveStr.substring(40, 46); log.info("连接的Id为:" + channel.id().asLongText()); online(ch, stcd); ConcurrentHashMapBean.channelMap.put(stcd, channel); log.info("设备: " + stcd + "与channel已经注册成功"); log.info("map的字节数" + ConcurrentHashMapBean.channelMap.size()); } // ConnectTOClient.write2Client("7e7e01020304050607", channel, "心跳"); DecisionIdentifier.ScreenIdentifier(receiveStr,act); } }); channel.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { /*ctx.close();*/ log.warn("exceptionCaught发生异常:"); cause.printStackTrace(); } //连接处于活动状态 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); log.info(channel.remoteAddress() + " 上线了"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { Channel channel = ctx.channel(); //根据 channel 找到 map的 key 然后删掉 做到数据同步 String stcd = NettyTool.getKeyByLoop(ConcurrentHashMapBean.channelMap, channel); log.info(" channelInactive---通道的设备号: " + NettyTool.getKeyByLoop(ConcurrentHashMapBean.channelMap, channel) + "下线"); ConcurrentHashMapBean.channelMap.remove(stcd); log.info(channel.remoteAddress() + " 下线了"); } catch (Exception e) { } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { try { Channel channel = ctx.channel(); log.info(" 【硬件设备】 -" + channel.remoteAddress() + " 离开\n"); //验证一下每次客户端断开连接,连接自动地从channelGroup中删除调。 // System.out.println(channelGroup.size()); //当客户端和服务端断开连接的时候,下面的那段代码netty会自动调用,所以不需要人为的去调用它 channelGroup.remove(channel); //下线的设备编号 String stcd = NettyTool.getKeyByLoop(ConcurrentHashMapBean.channelMap, channel); log.info("通道的设备号: " + NettyTool.getKeyByLoop(ConcurrentHashMapBean.channelMap, channel) + "下线"); //根据stcd 删除 连接通道 ConcurrentHashMapBean.channelMap.remove(stcd); } catch (Exception e) { } } /** * 上线一个用户 * * @param channel * @param userId */ public void online(Channel channel, String userId) { //先判断用户是否在web系统中登录? //这部分代码个人实现,参考上面redis中的验证 ConcurrentHashMapBean.channelMap.put(userId, channel); AttributeKey key = AttributeKey.valueOf("user"); channel.attr(key).set(userId); } }