@@ -113,182 +113,3 @@ def queued(self):
113113 """The number of threads blocked waiting on memory."""
114114 with self ._lock :
115115 return len (self ._waiters )
116-
117- '''
118- class BufferPool(object):
119- """
120- A pool of ByteBuffers kept under a given memory limit. This class is fairly
121- specific to the needs of the producer. In particular it has the following
122- properties:
123-
124- * There is a special "poolable size" and buffers of this size are kept in a
125- free list and recycled
126- * It is fair. That is all memory is given to the longest waiting thread
127- until it has sufficient memory. This prevents starvation or deadlock when
128- a thread asks for a large chunk of memory and needs to block until
129- multiple buffers are deallocated.
130- """
131- def __init__(self, memory, poolable_size):
132- """Create a new buffer pool.
133-
134- Arguments:
135- memory (int): maximum memory that this buffer pool can allocate
136- poolable_size (int): memory size per buffer to cache in the free
137- list rather than deallocating
138- """
139- self._poolable_size = poolable_size
140- self._lock = threading.RLock()
141- self._free = collections.deque()
142- self._waiters = collections.deque()
143- self._total_memory = memory
144- self._available_memory = memory
145- #self.metrics = metrics;
146- #self.waitTime = this.metrics.sensor("bufferpool-wait-time");
147- #MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation.");
148- #this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
149-
150- def allocate(self, size, max_time_to_block_ms):
151- """
152- Allocate a buffer of the given size. This method blocks if there is not
153- enough memory and the buffer pool is configured with blocking mode.
154-
155- Arguments:
156- size (int): The buffer size to allocate in bytes
157- max_time_to_block_ms (int): The maximum time in milliseconds to
158- block for buffer memory to be available
159-
160- Returns:
161- buffer
162-
163- Raises:
164- InterruptedException If the thread is interrupted while blocked
165- IllegalArgumentException if size is larger than the total memory
166- controlled by the pool (and hence we would block forever)
167- """
168- assert size <= self._total_memory, (
169- "Attempt to allocate %d bytes, but there is a hard limit of %d on"
170- " memory allocations." % (size, self._total_memory))
171-
172- with self._lock:
173- # check if we have a free buffer of the right size pooled
174- if (size == self._poolable_size and len(self._free) > 0):
175- return self._free.popleft()
176-
177- # now check if the request is immediately satisfiable with the
178- # memory on hand or if we need to block
179- free_list_size = len(self._free) * self._poolable_size
180- if self._available_memory + free_list_size >= size:
181- # we have enough unallocated or pooled memory to immediately
182- # satisfy the request
183- self._free_up(size)
184- self._available_memory -= size
185- raise NotImplementedError()
186- #return ByteBuffer.allocate(size)
187- else:
188- # we are out of memory and will have to block
189- accumulated = 0
190- buf = None
191- more_memory = threading.Condition(self._lock)
192- self._waiters.append(more_memory)
193- # loop over and over until we have a buffer or have reserved
194- # enough memory to allocate one
195- while (accumulated < size):
196- start_wait = time.time()
197- if not more_memory.wait(max_time_to_block_ms / 1000.0):
198- raise Errors.KafkaTimeoutError(
199- "Failed to allocate memory within the configured"
200- " max blocking time")
201- end_wait = time.time()
202- #this.waitTime.record(endWait - startWait, time.milliseconds());
203-
204- # check if we can satisfy this request from the free list,
205- # otherwise allocate memory
206- if (accumulated == 0
207- and size == self._poolable_size
208- and self._free):
209-
210- # just grab a buffer from the free list
211- buf = self._free.popleft()
212- accumulated = size
213- else:
214- # we'll need to allocate memory, but we may only get
215- # part of what we need on this iteration
216- self._free_up(size - accumulated)
217- got = min(size - accumulated, self._available_memory)
218- self._available_memory -= got
219- accumulated += got
220-
221- # remove the condition for this thread to let the next thread
222- # in line start getting memory
223- removed = self._waiters.popleft()
224- assert removed is more_memory, 'Wrong condition'
225-
226- # signal any additional waiters if there is more memory left
227- # over for them
228- if (self._available_memory > 0 or len(self._free) > 0):
229- if len(self._waiters) > 0:
230- self._waiters[0].notify()
231-
232- # unlock and return the buffer
233- if buf is None:
234- raise NotImplementedError()
235- #return ByteBuffer.allocate(size)
236- else:
237- return buf
238-
239- def _free_up(self, size):
240- """
241- Attempt to ensure we have at least the requested number of bytes of
242- memory for allocation by deallocating pooled buffers (if needed)
243- """
244- while self._free and self._available_memory < size:
245- self._available_memory += self._free.pop().capacity
246-
247- def deallocate(self, buffer_, size=None):
248- """
249- Return buffers to the pool. If they are of the poolable size add them
250- to the free list, otherwise just mark the memory as free.
251-
252- Arguments:
253- buffer (io.BytesIO): The buffer to return
254- size (int): The size of the buffer to mark as deallocated, note
255- that this maybe smaller than buffer.capacity since the buffer
256- may re-allocate itself during in-place compression
257- """
258- with self._lock:
259- if size is None:
260- size = buffer_.capacity
261- if (size == self._poolable_size and size == buffer_.capacity):
262- buffer_.seek(0)
263- buffer_.truncate()
264- self._free.append(buffer_)
265- else:
266- self._available_memory += size
267-
268- if self._waiters:
269- more_mem = self._waiters[0]
270- more_mem.notify()
271-
272- def available_memory(self):
273- """The total free memory both unallocated and in the free list."""
274- with self._lock:
275- return self._available_memory + len(self._free) * self._poolable_size
276-
277- def unallocated_memory(self):
278- """Get the unallocated memory (not in the free list or in use)."""
279- with self._lock:
280- return self._available_memory
281-
282- def queued(self):
283- """The number of threads blocked waiting on memory."""
284- with self._lock:
285- return len(self._waiters)
286-
287- def poolable_size(self):
288- """The buffer size that will be retained in the free list after use."""
289- return self._poolable_size
290-
291- def total_memory(self):
292- """The total memory managed by this pool."""
293- return self._total_memory
294- '''
0 commit comments