package com.easysocket.connection.dispatcher; 
 | 
  
 | 
import com.easysocket.EasySocket; 
 | 
import com.easysocket.entity.OriginReadData; 
 | 
import com.easysocket.entity.SocketAddress; 
 | 
import com.easysocket.interfaces.conn.IConnectionManager; 
 | 
import com.easysocket.interfaces.conn.ISocketActionDispatch; 
 | 
import com.easysocket.interfaces.conn.ISocketActionListener; 
 | 
import com.easysocket.utils.Utils; 
 | 
  
 | 
import java.io.Serializable; 
 | 
import java.nio.charset.Charset; 
 | 
import java.util.ArrayList; 
 | 
import java.util.Iterator; 
 | 
import java.util.List; 
 | 
import java.util.concurrent.LinkedBlockingQueue; 
 | 
  
 | 
import static com.easysocket.connection.action.IOAction.ACTION_READ_COMPLETE; 
 | 
import static com.easysocket.connection.action.SocketAction.ACTION_CONN_FAIL; 
 | 
import static com.easysocket.connection.action.SocketAction.ACTION_CONN_SUCCESS; 
 | 
import static com.easysocket.connection.action.SocketAction.ACTION_DISCONNECTION; 
 | 
  
 | 
/** 
 | 
 * Author:Alex 
 | 
 * Date:2019/6/1 
 | 
 * Note:socket行为分发器 
 | 
 */ 
 | 
public class SocketActionDispatcher implements ISocketActionDispatch { 
 | 
    /** 
 | 
     * 连接地址 
 | 
     */ 
 | 
    private SocketAddress socketAddress; 
 | 
    /** 
 | 
     * 连接器 
 | 
     */ 
 | 
    private IConnectionManager connectionManager; 
 | 
    /** 
 | 
     * 回调监听集合 
 | 
     */ 
 | 
    private List<ISocketActionListener> actionListeners = new ArrayList<>(); 
 | 
    /** 
 | 
     * 处理socket行为的线程 
 | 
     */ 
 | 
    private Thread actionThread; 
 | 
    /** 
 | 
     * 是否停止分发 
 | 
     */ 
 | 
    private boolean isStop; 
 | 
  
 | 
    /** 
 | 
     * 事件消费队列 
 | 
     */ 
 | 
    private final LinkedBlockingQueue<ActionBean> socketActions = new LinkedBlockingQueue(); 
 | 
    /** 
 | 
     * 切换到UI线程 
 | 
     */ 
 | 
    private MainThreadExecutor mainThreadExecutor = new MainThreadExecutor(); 
 | 
  
 | 
  
 | 
    public SocketActionDispatcher(IConnectionManager connectionManager, SocketAddress socketAddress) { 
 | 
        this.socketAddress = socketAddress; 
 | 
        this.connectionManager = connectionManager; 
 | 
    } 
 | 
  
 | 
    public void setSocketAddress(SocketAddress info) { 
 | 
        socketAddress = info; 
 | 
    } 
 | 
  
 | 
  
 | 
    @Override 
 | 
    public void dispatchAction(String action) { 
 | 
        dispatchAction(action, null); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void dispatchAction(String action, Serializable serializable) { 
 | 
        // 将接收到的socket行为封装入列 
 | 
        ActionBean actionBean = new ActionBean(action, serializable, this); 
 | 
        socketActions.offer(actionBean); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void subscribe(ISocketActionListener iSocketActionListener) { 
 | 
        if (iSocketActionListener != null && !actionListeners.contains(iSocketActionListener)) { 
 | 
            actionListeners.add(iSocketActionListener); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void unsubscribe(ISocketActionListener iSocketActionListener) { 
 | 
        actionListeners.remove(iSocketActionListener); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 分发线程 
 | 
     */ 
 | 
    private class DispatchThread extends Thread { 
 | 
  
 | 
        public DispatchThread() { 
 | 
            super("dispatch thread"); 
 | 
        } 
 | 
  
 | 
        @Override 
 | 
        public void run() { 
 | 
            // 循环处理socket的行为信息 
 | 
            while (!isStop) { 
 | 
                try { 
 | 
                    ActionBean actionBean = socketActions.take(); 
 | 
                    if (actionBean != null && actionBean.mDispatcher != null) { 
 | 
                        SocketActionDispatcher actionDispatcher = actionBean.mDispatcher; 
 | 
                        List<ISocketActionListener> copyListeners = new ArrayList<>(actionDispatcher.actionListeners); 
 | 
                        Iterator<ISocketActionListener> listeners = copyListeners.iterator(); 
 | 
                        // 通知所有监听者 
 | 
                        while (listeners.hasNext()) { 
 | 
                            ISocketActionListener listener = listeners.next(); 
 | 
                            actionDispatcher.dispatchActionToListener(actionBean.mAction, actionBean.arg, listener); 
 | 
                        } 
 | 
                    } 
 | 
                } catch (InterruptedException e) { 
 | 
                    e.printStackTrace(); 
 | 
                } 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * socket行为的封装 
 | 
     */ 
 | 
    protected static class ActionBean { 
 | 
  
 | 
        public ActionBean(String action, Serializable arg, SocketActionDispatcher dispatcher) { 
 | 
            mAction = action; 
 | 
            this.arg = arg; 
 | 
            mDispatcher = dispatcher; 
 | 
        } 
 | 
  
 | 
        String mAction = ""; 
 | 
        Serializable arg; 
 | 
        SocketActionDispatcher mDispatcher; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 分发行为给监听者 
 | 
     * 
 | 
     * @param action 
 | 
     * @param content 
 | 
     * @param actionListener 
 | 
     */ 
 | 
    private void dispatchActionToListener(String action, final Serializable content, final ISocketActionListener actionListener) { 
 | 
        switch (action) { 
 | 
  
 | 
            case ACTION_CONN_SUCCESS: // 连接成功 
 | 
                mainThreadExecutor.execute(new Runnable() { 
 | 
                    @Override 
 | 
                    public void run() { 
 | 
                        actionListener.onSocketConnSuccess(socketAddress); 
 | 
                    } 
 | 
                }); 
 | 
  
 | 
                break; 
 | 
  
 | 
            case ACTION_CONN_FAIL: // 连接失败 
 | 
                mainThreadExecutor.execute(new Runnable() { 
 | 
                    @Override 
 | 
                    public void run() { 
 | 
                        actionListener.onSocketConnFail(socketAddress, ((Boolean) content).booleanValue()); 
 | 
                    } 
 | 
                }); 
 | 
  
 | 
                break; 
 | 
  
 | 
            case ACTION_DISCONNECTION: // 连接断开 
 | 
                mainThreadExecutor.execute(new Runnable() { 
 | 
                    @Override 
 | 
                    public void run() { 
 | 
                        actionListener.onSocketDisconnect(socketAddress, ((Boolean) content).booleanValue()); 
 | 
                        // 不需要重连,则释放资源 
 | 
                        if (!(Boolean) content) { 
 | 
                            stopDispatchThread(); 
 | 
                        } 
 | 
                    } 
 | 
                }); 
 | 
                break; 
 | 
  
 | 
            case ACTION_READ_COMPLETE: // 读取数据完成 
 | 
                mainThreadExecutor.execute(new Runnable() { 
 | 
                    @Override 
 | 
                    public void run() { 
 | 
                        // response有三种形式 
 | 
                        actionListener.onSocketResponse(socketAddress, (OriginReadData) content); 
 | 
                        byte[] data = Utils.concatBytes(((OriginReadData) content).getHeaderData(), ((OriginReadData) content).getBodyBytes()); 
 | 
                        actionListener.onSocketResponse(socketAddress, new String(data, Charset.forName(EasySocket.getInstance().getDefOptions().getCharsetName()))); 
 | 
                        actionListener.onSocketResponse(socketAddress, data); 
 | 
                    } 
 | 
                }); 
 | 
                break; 
 | 
        } 
 | 
    } 
 | 
  
 | 
    // 开始分发线程 
 | 
    @Override 
 | 
    public void startDispatchThread() { 
 | 
        isStop = false; 
 | 
        if (actionThread == null) { 
 | 
            actionThread = new DispatchThread(); 
 | 
            actionThread.start(); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public void stopDispatchThread() { 
 | 
        if (actionThread != null && actionThread.isAlive() && !actionThread.isInterrupted()) { 
 | 
            socketActions.clear(); 
 | 
            //actionListeners.clear(); 
 | 
            isStop = true; 
 | 
            actionThread.interrupt(); 
 | 
            actionThread = null; 
 | 
        } 
 | 
    } 
 | 
  
 | 
} 
 |