comparison rhodecode/lib/subprocessio.py @ 2382:034e4fe1ebb2 beta

changed dulwich git interface to gitweb + subprocessio
author Marcin Kuzminski <marcin@python-works.com>
date Mon, 04 Jun 2012 02:56:09 +0200
parents
children a8635cdab3c0
comparison
equal deleted inserted replaced
2381:e487d2a6aa38 2382:034e4fe1ebb2
1 '''
2 Module provides a class allowing to wrap communication over subprocess.Popen
3 input, output, error streams into a meaningfull, non-blocking, concurrent
4 stream processor exposing the output data as an iterator fitting to be a
5 return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
6
7 Copyright (c) 2011 Daniel Dotsenko <dotsa@hotmail.com>
8
9 This file is part of git_http_backend.py Project.
10
11 git_http_backend.py Project is free software: you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public License as
13 published by the Free Software Foundation, either version 2.1 of the License,
14 or (at your option) any later version.
15
16 git_http_backend.py Project is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU Lesser General Public License for more details.
20
21 You should have received a copy of the GNU Lesser General Public License
22 along with git_http_backend.py Project.
23 If not, see <http://www.gnu.org/licenses/>.
24 '''
25 import os
26 import subprocess
27 import threading
28 from collections import deque
29
30
31 class StreamFeeder(threading.Thread):
32 """
33 Normal writing into pipe-like is blocking once the buffer is filled.
34 This thread allows a thread to seep data from a file-like into a pipe
35 without blocking the main thread.
36 We close inpipe once the end of the source stream is reached.
37 """
38 def __init__(self, source):
39 super(StreamFeeder, self).__init__()
40 self.daemon = True
41 filelike = False
42 self.bytes = b''
43 if type(source) in (type(''), bytes, bytearray): # string-like
44 self.bytes = bytes(source)
45 else: # can be either file pointer or file-like
46 if type(source) in (int, long): # file pointer it is
47 ## converting file descriptor (int) stdin into file-like
48 try:
49 source = os.fdopen(source, 'rb', 16384)
50 except:
51 pass
52 # let's see if source is file-like by now
53 try:
54 filelike = source.read
55 except:
56 pass
57 if not filelike and not self.bytes:
58 raise TypeError("StreamFeeder's source object must be a readable file-like, a file descriptor, or a string-like.")
59 self.source = source
60 self.readiface, self.writeiface = os.pipe()
61
62 def run(self):
63 t = self.writeiface
64 if self.bytes:
65 os.write(t, self.bytes)
66 else:
67 s = self.source
68 b = s.read(4096)
69 while b:
70 os.write(t, b)
71 b = s.read(4096)
72 os.close(t)
73
74 @property
75 def output(self):
76 return self.readiface
77
78
79 class InputStreamChunker(threading.Thread):
80 def __init__(self, source, target, buffer_size, chunk_size):
81
82 super(InputStreamChunker, self).__init__()
83
84 self.daemon = True # die die die.
85
86 self.source = source
87 self.target = target
88 self.chunk_count_max = int(buffer_size / chunk_size) + 1
89 self.chunk_size = chunk_size
90
91 self.data_added = threading.Event()
92 self.data_added.clear()
93
94 self.keep_reading = threading.Event()
95 self.keep_reading.set()
96
97 self.EOF = threading.Event()
98 self.EOF.clear()
99
100 self.go = threading.Event()
101 self.go.set()
102
103 def stop(self):
104 self.go.clear()
105 self.EOF.set()
106 try:
107 # this is not proper, but is done to force the reader thread let
108 # go of the input because, if successful, .close() will send EOF
109 # down the pipe.
110 self.source.close()
111 except:
112 pass
113
114 def run(self):
115 s = self.source
116 t = self.target
117 cs = self.chunk_size
118 ccm = self.chunk_count_max
119 kr = self.keep_reading
120 da = self.data_added
121 go = self.go
122 b = s.read(cs)
123 while b and go.is_set():
124 if len(t) > ccm:
125 kr.clear()
126 kr.wait(2)
127 # # this only works on 2.7.x and up
128 # if not kr.wait(10):
129 # raise Exception("Timed out while waiting for input to be read.")
130 # instead we'll use this
131 if len(t) > ccm + 3:
132 raise IOError("Timed out while waiting for input from subprocess.")
133 t.append(b)
134 da.set()
135 b = s.read(cs)
136 self.EOF.set()
137 da.set() # for cases when done but there was no input.
138
139
140 class BufferedGenerator():
141 '''
142 Class behaves as a non-blocking, buffered pipe reader.
143 Reads chunks of data (through a thread)
144 from a blocking pipe, and attaches these to an array (Deque) of chunks.
145 Reading is halted in the thread when max chunks is internally buffered.
146 The .next() may operate in blocking or non-blocking fashion by yielding
147 '' if no data is ready
148 to be sent or by not returning until there is some data to send
149 When we get EOF from underlying source pipe we raise the marker to raise
150 StopIteration after the last chunk of data is yielded.
151 '''
152
153 def __init__(self, source, buffer_size=65536, chunk_size=4096,
154 starting_values=[], bottomless=False):
155
156 if bottomless:
157 maxlen = int(buffer_size / chunk_size)
158 else:
159 maxlen = None
160
161 self.data = deque(starting_values, maxlen)
162
163 self.worker = InputStreamChunker(source, self.data, buffer_size,
164 chunk_size)
165 if starting_values:
166 self.worker.data_added.set()
167 self.worker.start()
168
169 ####################
170 # Generator's methods
171 ####################
172
173 def __iter__(self):
174 return self
175
176 def next(self):
177 while not len(self.data) and not self.worker.EOF.is_set():
178 self.worker.data_added.clear()
179 self.worker.data_added.wait(0.2)
180 if len(self.data):
181 self.worker.keep_reading.set()
182 return bytes(self.data.popleft())
183 elif self.worker.EOF.is_set():
184 raise StopIteration
185
186 def throw(self, type, value=None, traceback=None):
187 if not self.worker.EOF.is_set():
188 raise type(value)
189
190 def start(self):
191 self.worker.start()
192
193 def stop(self):
194 self.worker.stop()
195
196 def close(self):
197 try:
198 self.worker.stop()
199 self.throw(GeneratorExit)
200 except (GeneratorExit, StopIteration):
201 pass
202
203 def __del__(self):
204 self.close()
205
206 ####################
207 # Threaded reader's infrastructure.
208 ####################
209 @property
210 def input(self):
211 return self.worker.w
212
213 @property
214 def data_added_event(self):
215 return self.worker.data_added
216
217 @property
218 def data_added(self):
219 return self.worker.data_added.is_set()
220
221 @property
222 def reading_paused(self):
223 return not self.worker.keep_reading.is_set()
224
225 @property
226 def done_reading_event(self):
227 '''
228 Done_reding does not mean that the iterator's buffer is empty.
229 Iterator might have done reading from underlying source, but the read
230 chunks might still be available for serving through .next() method.
231
232 @return An Event class instance.
233 '''
234 return self.worker.EOF
235
236 @property
237 def done_reading(self):
238 '''
239 Done_reding does not mean that the iterator's buffer is empty.
240 Iterator might have done reading from underlying source, but the read
241 chunks might still be available for serving through .next() method.
242
243 @return An Bool value.
244 '''
245 return self.worker.EOF.is_set()
246
247 @property
248 def length(self):
249 '''
250 returns int.
251
252 This is the lenght of the que of chunks, not the length of
253 the combined contents in those chunks.
254
255 __len__() cannot be meaningfully implemented because this
256 reader is just flying throuh a bottomless pit content and
257 can only know the lenght of what it already saw.
258
259 If __len__() on WSGI server per PEP 3333 returns a value,
260 the responce's length will be set to that. In order not to
261 confuse WSGI PEP3333 servers, we will not implement __len__
262 at all.
263 '''
264 return len(self.data)
265
266 def prepend(self, x):
267 self.data.appendleft(x)
268
269 def append(self, x):
270 self.data.append(x)
271
272 def extend(self, o):
273 self.data.extend(o)
274
275 def __getitem__(self, i):
276 return self.data[i]
277
278
279 class SubprocessIOChunker():
280 '''
281 Processor class wrapping handling of subprocess IO.
282
283 In a way, this is a "communicate()" replacement with a twist.
284
285 - We are multithreaded. Writing in and reading out, err are all sep threads.
286 - We support concurrent (in and out) stream processing.
287 - The output is not a stream. It's a queue of read string (bytes, not unicode)
288 chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
289 - We are non-blocking in more respects than communicate()
290 (reading from subprocess out pauses when internal buffer is full, but
291 does not block the parent calling code. On the flip side, reading from
292 slow-yielding subprocess may block the iteration until data shows up. This
293 does not block the parallel inpipe reading occurring parallel thread.)
294
295 The purpose of the object is to allow us to wrap subprocess interactions into
296 and interable that can be passed to a WSGI server as the application's return
297 value. Because of stream-processing-ability, WSGI does not have to read ALL
298 of the subprocess's output and buffer it, before handing it to WSGI server for
299 HTTP response. Instead, the class initializer reads just a bit of the stream
300 to figure out if error ocurred or likely to occur and if not, just hands the
301 further iteration over subprocess output to the server for completion of HTTP
302 response.
303
304 The real or perceived subprocess error is trapped and raised as one of
305 EnvironmentError family of exceptions
306
307 Example usage:
308 # try:
309 # answer = SubprocessIOChunker(
310 # cmd,
311 # input,
312 # buffer_size = 65536,
313 # chunk_size = 4096
314 # )
315 # except (EnvironmentError) as e:
316 # print str(e)
317 # raise e
318 #
319 # return answer
320
321
322 '''
323 def __init__(self, cmd, inputstream=None, buffer_size=65536,
324 chunk_size=4096, starting_values=[]):
325 '''
326 Initializes SubprocessIOChunker
327
328 @param cmd A Subprocess.Popen style "cmd". Can be string or array of strings
329 @param inputstream (Default: None) A file-like, string, or file pointer.
330 @param buffer_size (Default: 65536) A size of total buffer per stream in bytes.
331 @param chunk_size (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
332 @param starting_values (Default: []) An array of strings to put in front of output que.
333 '''
334
335 if inputstream:
336 input_streamer = StreamFeeder(inputstream)
337 input_streamer.start()
338 inputstream = input_streamer.output
339
340 _p = subprocess.Popen(cmd,
341 bufsize=-1,
342 shell=True,
343 stdin=inputstream,
344 stdout=subprocess.PIPE,
345 stderr=subprocess.PIPE
346 )
347
348 bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
349 bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
350
351 while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
352 # doing this until we reach either end of file, or end of buffer.
353 bg_out.data_added_event.wait(1)
354 bg_out.data_added_event.clear()
355
356 # at this point it's still ambiguous if we are done reading or just full buffer.
357 # Either way, if error (returned by ended process, or implied based on
358 # presence of stuff in stderr output) we error out.
359 # Else, we are happy.
360 _returncode = _p.poll()
361 if _returncode or (_returncode == None and bg_err.length):
362 try:
363 _p.terminate()
364 except:
365 pass
366 bg_out.stop()
367 bg_err.stop()
368 raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
369
370 self.process = _p
371 self.output = bg_out
372 self.error = bg_err
373
374 def __iter__(self):
375 return self
376
377 def next(self):
378 if self.process.poll():
379 raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
380 return self.output.next()
381
382 def throw(self, type, value=None, traceback=None):
383 if self.output.length or not self.output.done_reading:
384 raise type(value)
385
386 def close(self):
387 try:
388 self.process.terminate()
389 except:
390 pass
391 try:
392 self.output.close()
393 except:
394 pass
395 try:
396 self.error.close()
397 except:
398 pass
399
400 def __del__(self):
401 self.close()