日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
JavaSocket通信技術(shù)收發(fā)線程互斥的解決方法

Java Socket通信技術(shù)在很長的時間里都在使用,在不少的程序員眼中都有很多高的評價。那么下面我們就看看如何才能掌握這門復(fù)雜的編程語言,希望大家在今后的Java Socket通信技術(shù)使用中有所收獲。

創(chuàng)新互聯(lián)專注于企業(yè)成都營銷網(wǎng)站建設(shè)、網(wǎng)站重做改版、富錦網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5頁面制作商城網(wǎng)站定制開發(fā)、集團公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為富錦等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。

下面就是Java Socket通信技術(shù)在解決收發(fā)線程互斥的代碼介紹。

 
 
 
  1. package com.bill99.svr; 
  2. import java.io.IOException; 
  3. import java.io.InputStream; 
  4. import java.io.OutputStream; 
  5. import java.net.InetSocketAddress; 
  6. import java.net.Socket; 
  7. import java.net.SocketException; 
  8. import java.net.SocketTimeoutException; 
  9. import java.text.SimpleDateFormat; 
  10. import java.util.Date; 
  11. import java.util.Properties; 
  12. import java.util.Timer; 
  13. import java.util.TimerTask; 
  14. import java.util.concurrent.ConcurrentHashMap; 
  15. import java.util.concurrent.TimeUnit; 
  16. import java.util.concurrent.locks.Condition; 
  17. import java.util.concurrent.locks.ReentrantLock; 
  18. import org.apache.log4j.Logger; 
  19. /** 
  20. *

    title: socket通信包裝類

     
  21. *

    Description: 

     
  22. *

    CopyRight: CopyRight (c) 2009

     
  23. *

    Company: 99bill.com

     
  24. *

    Create date: 2009-10-14

     
  25. *author sunnylocus 
  26.  * v0.10 2009-10-14 初類 
  27. * v0.11 2009-11-12 對命令收發(fā)邏輯及收發(fā)線程互斥機制進行了優(yōu)化,
    處理命令速度由原來8~16個/秒提高到25~32個/秒 
  28. */ public class SocketConnection { 
  29. private volatile Socket socket; 
  30. private int timeout = 1000*10; //超時時間,初始值10秒 
  31. private boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測 
  32. private boolean isNetworkConnect = false; //網(wǎng)絡(luò)是否已連接 
  33. private static String host = ""; 
  34. private static int port; 
  35. static InputStream inStream = null; 
  36. static OutputStream outStream = null; 
  37. private static Logger log =Logger.getLogger
    (SocketConnection.class); 
  38. private static SocketConnection socketConnection = null; 
  39. private static java.util.Timer heartTimer=null;   
  40. //private final Map recMsgMap= Collections.
    synchronizedMap(new HashMap()); 
  41. private final ConcurrentHashMap recMsgMap 
    = new ConcurrentHashMap(); 
  42. private static Thread receiveThread = null; 
  43. private final ReentrantLock lock = new ReentrantLock(); 
  44. private SocketConnection(){ 
  45. Properties conf = new Properties(); 
  46. try { 
  47. conf.load(SocketConnection.class.getResourceAsStream
    ("test.conf")); 
  48. this.timeout = Integer.valueOf(conf.getProperty("timeout")); 
  49. init(conf.getProperty("ip"),Integer.valueOf
    (conf.getProperty("port"))); 
  50. } catch(IOException e) { 
  51. log.fatal("socket初始化異常!",e); 
  52. throw new RuntimeException("socket初始化異常,請檢查配置參數(shù)"); 
  53. /** 
  54. * 單態(tài)模式 
  55. */ 
  56. public static SocketConnection getInstance() { 
  57. if(socketConnection==null) { 
  58. synchronized(SocketConnection.class) { 
  59. if(socketConnection==null) { 
  60. socketConnection = new SocketConnection(); 
  61. return socketConnection; 
  62. return socketConnection; 
  63. private void init(String host,int port) throws IOException { 
  64. InetSocketAddress addr = new InetSocketAddress(host,port); 
  65. socket = new Socket(); 
  66. synchronized (this) { 
  67. log.info("【準備與"+addr+"建立連接】"); 
  68. socket.connect(addr, timeout); 
  69. log.info("【與"+addr+"連接已建立】"); 
  70. inStream = socket.getInputStream(); 
  71. outStream = socket.getOutputStream(); 
  72. socket.setTcpNoDelay(true);//數(shù)據(jù)不作緩沖,立即發(fā)送 
  73. socket.setSoLinger(true, 0);//socket關(guān)閉時,立即釋放資源 
  74. socket.setKeepAlive(true); 
  75. socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸 
  76. isNetworkConnect=true; 
  77. receiveThread = new Thread(new ReceiveWorker()); 
  78. receiveThread.start(); 
  79. SocketConnection.host=host; 
  80. SocketConnection.port=port; 
  81. if(!isLaunchHeartcheck) 
  82. launchHeartcheck(); 
  83. /** 
  84. * 心跳包檢測 
  85. */ 
  86. private void launchHeartcheck() { 
  87. if(socket == null) 
  88. throw new IllegalStateException("socket is not 
    established!"); 
  89. heartTimer = new Timer(); 
  90. isLaunchHeartcheck = true; 
  91. heartTimer.schedule(new TimerTask() { 
  92. public void run() { 
  93. String msgStreamNo = StreamNoGenerator.getStreamNo("kq"); 
  94. int mstType =9999;//999-心跳包請求 
  95. SimpleDateFormat dateformate = new SimpleDateFormat
    ("yyyyMMddHHmmss"); 
  96. String msgDateTime = dateformate.format(new Date()); 
  97. int msgLength =38;//消息頭長度 
  98. String commandstr = "00" +msgLength + mstType + msgStreamNo; 
  99. log.info("心跳檢測包 -> IVR "+commandstr); 
  100. int reconnCounter = 1; 
  101. while(true) { 
  102. String responseMsg =null; 
  103. try { 
  104. responseMsg = readReqMsg(commandstr); 
  105. } catch (IOException e) { 
  106. log.error("IO流異常",e); 
  107. reconnCounter ++; 
  108. if(responseMsg!=null) { 
  109. log.info("心跳響應(yīng)包 <- IVR "+responseMsg); 
  110. reconnCounter = 1; 
  111. break; 
  112. } else { 
  113. reconnCounter ++; 
  114. if(reconnCounter >3) {//重連次數(shù)已達三次,判定網(wǎng)絡(luò)連接中斷,
    重新建立連接。連接未被建立時不釋放鎖 
  115. reConnectToCTCC(); break; 
  116. },1000 * 60*1,1000*60*2); 
  117. /** 
  118. * 重連與目標(biāo)IP建立重連 
  119. */ 
  120. private void reConnectToCTCC() { 
  121. new Thread(new Runnable(){ 
  122. public void run(){ 
  123. log.info("重新建立與"+host+":"+port+"的連接"); 
  124. //清理工作,中斷計時器,中斷接收線程,恢復(fù)初始變量 
  125. heartTimer.cancel(); 
  126. isLaunchHeartcheck=false; 
  127. isNetworkConnect = false; 
  128. receiveThread.interrupt(); 
  129. try { 
  130. socket.close(); 
  131. } catch (IOException e1) {log.error("重連時,關(guān)閉socket連
    接發(fā)生IO流異常",e1);} 
  132. //---------------- 
  133. synchronized(this){ 
  134. for(; ;){ 
  135. try { 
  136. Thread.currentThread(); 
  137. Thread.sleep(1000 * 1); 
  138. init(host,port); 
  139. this.notifyAll(); 
  140. break ; 
  141. } catch (IOException e) { 
  142. log.error("重新建立連接未成功",e); 
  143. } catch (InterruptedException e){ 
  144. log.error("重連線程中斷",e); 
  145. }).start(); 
  146. /** 
  147. * 發(fā)送命令并接受響應(yīng) 
  148. * @param requestMsg 
  149. * @return 
  150. * @throws SocketTimeoutException 
  151. * @throws IOException 
  152. */ 
  153. public String readReqMsg(String requestMsg) throws IOException { 
  154. if(requestMsg ==null) { 
  155. return null; 
  156. if(!isNetworkConnect) { 
  157. synchronized(this){ 
  158. try { 
  159. this.wait(1000*5); //等待5秒,如果網(wǎng)絡(luò)還沒有恢復(fù),拋出IO流異常 
  160. if(!isNetworkConnect) { 
  161. throw new IOException("網(wǎng)絡(luò)連接中斷!"); 
  162. } catch (InterruptedException e) { 
  163. log.error("發(fā)送線程中斷",e); 
  164. String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號 
  165. outStream = socket.getOutputStream(); 
  166. outStream.write(requestMsg.getBytes()); 
  167. outStream.flush(); 
  168. Condition msglock = lock.newCondition(); //消息鎖 
  169. //注冊等待接收消息 
  170. recMsgMap.put(msgNo, msglock); 
  171. try { 
  172. lock.lock(); 
  173. msglock.await(timeout,TimeUnit.MILLISECONDS); 
  174. } catch (InterruptedException e) { 
  175. log.error("發(fā)送線程中斷",e); 
  176. } finally { 
  177. lock.unlock(); 
  178. Object respMsg = recMsgMap.remove(msgNo); //響應(yīng)信息 
  179. if(respMsg!=null &&(respMsg != msglock)) { 
  180. //已經(jīng)接收到消息,注銷等待,成功返回消息 
  181. return (String) respMsg; 
  182. } else { 
  183. log.error(msgNo+" 超時,未收到響應(yīng)消息"); 
  184. throw new SocketTimeoutException(msgNo+" 超時,未收到響應(yīng)消息"); 
  185. public void finalize() { 
  186. if (socket != null) { 
  187. try { 
  188. socket.close(); 
  189. } catch (IOException e) { 
  190. e.printStackTrace(); 
  191. //消息接收線程 
  192. private class ReceiveWorker implements Runnable { 
  193. String intStr= null; 
  194. public void run() { 
  195. while(!Thread.interrupted()){ 
  196. try { 
  197. byte[] headBytes = new byte[4]; 
  198. if(inStream.read(headBytes)==-1){ 
  199. log.warn("讀到流未尾,對方已關(guān)閉流!"); 
  200. reConnectToCTCC();//讀到流未尾,對方已關(guān)閉流 
  201. return; 
  202. byte[] tmp =new byte[4]; 
  203. tmp = headBytes; 
  204. String tempStr = new String(tmp).trim(); 
  205. if(tempStr==null || tempStr.equals("")) { 
  206. log.error("received message is null"); 
  207. continue; 
  208. intStr = new String(tmp); 
  209. int totalLength =Integer.parseInt(intStr); 
  210. //---------------- 
  211. byte[] msgBytes = new byte[totalLength-4]; 
  212. inStream.read(msgBytes); 
  213. String resultMsg = new String(headBytes)+ new 
    String(msgBytes); 
  214. //抽出消息ID 
  215. String msgNo = resultMsg.substring(8, 8 + 24); 
  216. Condition msglock =(Condition) recMsgMap.get(msgNo); 
  217. if(msglock ==null) { 
  218. log.warn(msgNo+"序號可能已被注銷!響應(yīng)消息丟棄"); 
  219. recMsgMap.remove(msgNo); 
  220. continue; 
  221. recMsgMap.put(msgNo, resultMsg); 
  222. try{ 
  223. lock.lock(); 
  224. msglock.signalAll(); 
  225. }finally { 
  226. lock.unlock(); 
  227. }catch(SocketException e){ 
  228. log.error("服務(wù)端關(guān)閉socket",e); 
  229. reConnectToCTCC(); 
  230. } catch(IOException e) { 
  231. log.error("接收線程讀取響應(yīng)數(shù)據(jù)時發(fā)生IO流異常",e); 
  232. } catch(NumberFormatException e){ 
  233. log.error("收到?jīng)]良心包,String轉(zhuǎn)int異常,異常字符:"+intStr); 
  234. }

以上就是對Java Socket通信技術(shù)中收發(fā)線程互斥的詳細解決方法。希望大家有所領(lǐng)悟。


分享標(biāo)題:JavaSocket通信技術(shù)收發(fā)線程互斥的解決方法
本文URL:http://www.5511xx.com/article/djggphd.html