blob: d6e1aeb527266af97f0338806a673894ec2f3c64 [file] [log] [blame]
Olivier Deprezf4ef2d02021-04-20 13:36:24 +02001"""Implements (a subset of) Sun XDR -- eXternal Data Representation.
2
3See: RFC 1014
4
5"""
6
7import struct
8from io import BytesIO
9from functools import wraps
10
11__all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
12
13# exceptions
14class Error(Exception):
15 """Exception class for this module. Use:
16
17 except xdrlib.Error as var:
18 # var has the Error instance for the exception
19
20 Public ivars:
21 msg -- contains the message
22
23 """
24 def __init__(self, msg):
25 self.msg = msg
26 def __repr__(self):
27 return repr(self.msg)
28 def __str__(self):
29 return str(self.msg)
30
31
32class ConversionError(Error):
33 pass
34
35def raise_conversion_error(function):
36 """ Wrap any raised struct.errors in a ConversionError. """
37
38 @wraps(function)
39 def result(self, value):
40 try:
41 return function(self, value)
42 except struct.error as e:
43 raise ConversionError(e.args[0]) from None
44 return result
45
46
47class Packer:
48 """Pack various data representations into a buffer."""
49
50 def __init__(self):
51 self.reset()
52
53 def reset(self):
54 self.__buf = BytesIO()
55
56 def get_buffer(self):
57 return self.__buf.getvalue()
58 # backwards compatibility
59 get_buf = get_buffer
60
61 @raise_conversion_error
62 def pack_uint(self, x):
63 self.__buf.write(struct.pack('>L', x))
64
65 @raise_conversion_error
66 def pack_int(self, x):
67 self.__buf.write(struct.pack('>l', x))
68
69 pack_enum = pack_int
70
71 def pack_bool(self, x):
72 if x: self.__buf.write(b'\0\0\0\1')
73 else: self.__buf.write(b'\0\0\0\0')
74
75 def pack_uhyper(self, x):
76 try:
77 self.pack_uint(x>>32 & 0xffffffff)
78 except (TypeError, struct.error) as e:
79 raise ConversionError(e.args[0]) from None
80 try:
81 self.pack_uint(x & 0xffffffff)
82 except (TypeError, struct.error) as e:
83 raise ConversionError(e.args[0]) from None
84
85 pack_hyper = pack_uhyper
86
87 @raise_conversion_error
88 def pack_float(self, x):
89 self.__buf.write(struct.pack('>f', x))
90
91 @raise_conversion_error
92 def pack_double(self, x):
93 self.__buf.write(struct.pack('>d', x))
94
95 def pack_fstring(self, n, s):
96 if n < 0:
97 raise ValueError('fstring size must be nonnegative')
98 data = s[:n]
99 n = ((n+3)//4)*4
100 data = data + (n - len(data)) * b'\0'
101 self.__buf.write(data)
102
103 pack_fopaque = pack_fstring
104
105 def pack_string(self, s):
106 n = len(s)
107 self.pack_uint(n)
108 self.pack_fstring(n, s)
109
110 pack_opaque = pack_string
111 pack_bytes = pack_string
112
113 def pack_list(self, list, pack_item):
114 for item in list:
115 self.pack_uint(1)
116 pack_item(item)
117 self.pack_uint(0)
118
119 def pack_farray(self, n, list, pack_item):
120 if len(list) != n:
121 raise ValueError('wrong array size')
122 for item in list:
123 pack_item(item)
124
125 def pack_array(self, list, pack_item):
126 n = len(list)
127 self.pack_uint(n)
128 self.pack_farray(n, list, pack_item)
129
130
131
132class Unpacker:
133 """Unpacks various data representations from the given buffer."""
134
135 def __init__(self, data):
136 self.reset(data)
137
138 def reset(self, data):
139 self.__buf = data
140 self.__pos = 0
141
142 def get_position(self):
143 return self.__pos
144
145 def set_position(self, position):
146 self.__pos = position
147
148 def get_buffer(self):
149 return self.__buf
150
151 def done(self):
152 if self.__pos < len(self.__buf):
153 raise Error('unextracted data remains')
154
155 def unpack_uint(self):
156 i = self.__pos
157 self.__pos = j = i+4
158 data = self.__buf[i:j]
159 if len(data) < 4:
160 raise EOFError
161 return struct.unpack('>L', data)[0]
162
163 def unpack_int(self):
164 i = self.__pos
165 self.__pos = j = i+4
166 data = self.__buf[i:j]
167 if len(data) < 4:
168 raise EOFError
169 return struct.unpack('>l', data)[0]
170
171 unpack_enum = unpack_int
172
173 def unpack_bool(self):
174 return bool(self.unpack_int())
175
176 def unpack_uhyper(self):
177 hi = self.unpack_uint()
178 lo = self.unpack_uint()
179 return int(hi)<<32 | lo
180
181 def unpack_hyper(self):
182 x = self.unpack_uhyper()
183 if x >= 0x8000000000000000:
184 x = x - 0x10000000000000000
185 return x
186
187 def unpack_float(self):
188 i = self.__pos
189 self.__pos = j = i+4
190 data = self.__buf[i:j]
191 if len(data) < 4:
192 raise EOFError
193 return struct.unpack('>f', data)[0]
194
195 def unpack_double(self):
196 i = self.__pos
197 self.__pos = j = i+8
198 data = self.__buf[i:j]
199 if len(data) < 8:
200 raise EOFError
201 return struct.unpack('>d', data)[0]
202
203 def unpack_fstring(self, n):
204 if n < 0:
205 raise ValueError('fstring size must be nonnegative')
206 i = self.__pos
207 j = i + (n+3)//4*4
208 if j > len(self.__buf):
209 raise EOFError
210 self.__pos = j
211 return self.__buf[i:i+n]
212
213 unpack_fopaque = unpack_fstring
214
215 def unpack_string(self):
216 n = self.unpack_uint()
217 return self.unpack_fstring(n)
218
219 unpack_opaque = unpack_string
220 unpack_bytes = unpack_string
221
222 def unpack_list(self, unpack_item):
223 list = []
224 while 1:
225 x = self.unpack_uint()
226 if x == 0: break
227 if x != 1:
228 raise ConversionError('0 or 1 expected, got %r' % (x,))
229 item = unpack_item()
230 list.append(item)
231 return list
232
233 def unpack_farray(self, n, unpack_item):
234 list = []
235 for i in range(n):
236 list.append(unpack_item())
237 return list
238
239 def unpack_array(self, unpack_item):
240 n = self.unpack_uint()
241 return self.unpack_farray(n, unpack_item)