-
Notifications
You must be signed in to change notification settings - Fork 77
Expand file tree
/
Copy pathsession.py
More file actions
129 lines (105 loc) · 3.94 KB
/
session.py
File metadata and controls
129 lines (105 loc) · 3.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from msgpackrpc import Loop
from msgpackrpc import message
from msgpackrpc.future import Future
from msgpackrpc.transport import tcp
from msgpackrpc.compat import iteritems
from msgpackrpc.error import TimeoutError
class Session(object):
"""\
Session processes send/recv request of the message, by using underlying
transport layer.
self._request_table(request table) stores the relationship between messageid and
corresponding future. When the new requets are sent, the Session generates
new message id and new future. Then the Session registers them to request table.
When it receives the message, the Session lookups the request table and set the
result to the corresponding future.
"""
def __init__(self, address, timeout, loop=None, builder=tcp, reconnect_limit=5, pack_encoding='utf-8', unpack_encoding=None):
"""\
:param address: address of the server.
:param loop: context object.
:param builder: builder for creating transport layer
"""
self._loop = loop or Loop()
self._address = address
self._timeout = timeout
self._transport = builder.ClientTransport(self, self._address, reconnect_limit, encodings=(pack_encoding, unpack_encoding))
self._generator = _NoSyncIDGenerator()
self._request_table = {}
@property
def address(self):
return self._address
def call(self, method, *args):
return self.send_request(method, args).get()
def call_async(self, method, *args):
return self.send_request(method, args)
def send_request(self, method, args):
# need lock?
msgid = next(self._generator)
future = Future(self._loop, self._timeout)
self._request_table[msgid] = future
self._transport.send_message([message.REQUEST, msgid, method, args])
return future
def notify(self, method, *args):
def callback():
self._loop.stop()
self._transport.send_message([message.NOTIFY, method, args], callback=callback)
self._loop.start()
def close(self):
if self._transport:
self._transport.close()
self._transport = None
self._request_table = {}
def on_connect_failed(self, reason):
"""
The callback called when the connection failed.
Called by the transport layer.
"""
# set error for all requests
for msgid, future in iteritems(self._request_table):
future.set_error(reason)
self._request_table = {}
self.close()
self._loop.stop()
def on_response(self, msgid, error, result):
"""\
The callback called when the message arrives.
Called by the transport layer.
"""
if not msgid in self._request_table:
# TODO: Check timed-out msgid?
#raise RPCError("Unknown msgid: id = {0}".format(msgid))
return
future = self._request_table.pop(msgid)
if error is not None:
future.set_error(error)
else:
future.set_result(result)
self._loop.stop()
def on_timeout(self, msgid):
future = self._request_table.pop(msgid)
future.set_error("Request timed out")
def step_timeout(self):
timeouts = []
for msgid, future in iteritems(self._request_table):
if future.step_timeout():
timeouts.append(msgid)
if len(timeouts) == 0:
return
self._loop.stop()
for timeout in timeouts:
future = self._request_table.pop(timeout)
future.set_error(TimeoutError("Request timed out"))
self._loop.start()
def _NoSyncIDGenerator():
"""
Message ID Generator.
NOTE: Don't use in multithread. If you want use this
in multithreaded application, use lock.
"""
counter = 0
while True:
yield counter
counter += 1
if counter > (1 << 30):
counter = 0