process_sync.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # -*- coding:utf-8 -*-
  2. import sys,os,time
  3. reload(sys)
  4. sys.setdefaultencoding("utf-8")
  5. from multiprocessing import Process, Queue, Pipe
  6. from multiprocessing.connection import Listener,Client
  7. import json
  8. class ProcessSync():
  9. HEAD = "[ProcessCMD]"
  10. def __init__(self):
  11. pass
  12. def notifyMessage(self, msg):
  13. sendMsg = ProcessSync.HEAD + msg
  14. print sendMsg
  15. class QueueBinder():
  16. def __init__(self, sender, receiver):
  17. self.sender = sender
  18. self.receiver = receiver
  19. def setQueue(self, sendQue, recvQue):
  20. self.sender = sendQue
  21. self.receiver = recvQue
  22. """
  23. 参数:action:字符串
  24. data:字典,存放基本数据类型
  25. """
  26. def sendData(self, action, data):
  27. packet = [action,data]
  28. self.sender.put(packet)
  29. """
  30. 返回值:数组[action, data]
  31. data:字典,存放基本数据类型
  32. """
  33. def accept(self):
  34. return self.receiver.get()
  35. queBinder = None
  36. def getQueueBinder(sendQue, recvQue):
  37. global queBinder
  38. if (queBinder == None):
  39. queBinder = QueueBinder(sendQue, recvQue)
  40. else:
  41. queBinder.setQueue(sendQue,recvQue)
  42. return queBinder
  43. def testP1():
  44. binder = HttpConsoleBinder()
  45. binder.initConsoleListener()
  46. print "Wait http"
  47. action,data = binder.acceptHttp()
  48. print "testP1,action:",action,";data:",data
  49. print "Send to http"
  50. binder.sendToHttp("test", {"keyP1": "valueP1"})
  51. def testP2():
  52. binder = HttpConsoleBinder()
  53. binder.initHttpListener()
  54. print "Send to console"
  55. binder.sendToConsole("test", {"keyP2":"valueP2"})
  56. print "Wait console"
  57. action,data = binder.acceptConsole()
  58. print "testP2,action:",action,";data:",data
  59. class HttpConsoleBinder():
  60. HttpServer_Port = 6000
  61. Console_Port = HttpServer_Port + 1
  62. def __init__(self):
  63. self.httpAddress = ("localhost", HttpConsoleBinder.HttpServer_Port)
  64. self.consoleAddress = ("localhost", HttpConsoleBinder.Console_Port)
  65. def initHttpListener(self):
  66. self.httpListener = Listener(self.httpAddress, authkey="password")
  67. def initConsoleListener(self):
  68. self.consoleListener = Listener(self.consoleAddress, authkey="password")
  69. def sendToHttp(self,action, data):
  70. self.httpClient = Client(self.httpAddress, authkey="password")
  71. data["action"] = action
  72. jsonStr = json.dumps(data)
  73. try:
  74. self.httpClient.send(jsonStr)
  75. except Exception,e:
  76. print e
  77. finally:
  78. self.httpClient.close()
  79. def sendToConsole(self, action, data):
  80. self.consoleClient = Client(self.consoleAddress, authkey="password")
  81. data["action"] = action
  82. jsonStr = json.dumps(data)
  83. try:
  84. self.consoleClient.send(jsonStr)
  85. except Exception,e:
  86. print e
  87. finally:
  88. self.consoleClient.close()
  89. def acceptHttp(self):
  90. conn = self.consoleListener.accept()
  91. data = conn.recv()
  92. dataDict = json.loads(data)
  93. action = dataDict["action"]
  94. conn.close()
  95. return action,dataDict
  96. def acceptConsole(self):
  97. conn = self.httpListener.accept()
  98. data = conn.recv()
  99. dataDict = json.loads(data)
  100. action = dataDict["action"]
  101. conn.close()
  102. return action,dataDict
  103. if __name__ == "__main__":
  104. p1 = Process(target=testP1, args=())
  105. p1.start()
  106. time.sleep(3)
  107. p2 = Process(target=testP2, args=())
  108. p2.start()