GNU Radio Manual and C++ API Reference  3.10.9.1
The Free & Open Software Radio Ecosystem
buffer.h
Go to the documentation of this file.
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2004,2009-2011,2013 Free Software Foundation, Inc.
4  *
5  * This file is part of GNU Radio
6  *
7  * SPDX-License-Identifier: GPL-3.0-or-later
8  *
9  */
10 
11 #ifndef INCLUDED_GR_RUNTIME_BUFFER_H
12 #define INCLUDED_GR_RUNTIME_BUFFER_H
13 
14 #include <gnuradio/api.h>
15 #include <gnuradio/custom_lock.h>
16 #include <gnuradio/logger.h>
17 #include <gnuradio/runtime_types.h>
18 #include <gnuradio/tags.h>
19 #include <gnuradio/thread/thread.h>
20 #include <gnuradio/transfer_type.h>
21 
22 #include <functional>
23 #include <iostream>
24 #include <map>
25 #include <memory>
26 
27 
28 namespace gr {
29 
30 class vmcircbuf;
31 class buffer_reader;
32 class buffer_reader_sm;
33 
35 
36 typedef std::function<void*(void*, const void*, std::size_t)> mem_func_t;
37 
38 /*!
39  * \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
40  *
41  * The total size of the buffer will be rounded up to a system
42  * dependent boundary. This is typically the system page size, but
43  * under MS windows is 64KB.
44  *
45  * \param nitems is the minimum number of items the buffer will hold.
46  * \param sizeof_item is the size of an item in bytes.
47  * \param downstream_lcm_nitems is the least common multiple of the items to
48  * read by downstream block(s)
49  * \param downstream_max_out_mult is the maximum output multiple of all
50  * downstream blocks
51  * \param link is the block that writes to this buffer.
52  * \param buf_owner is the block that owns the buffer which may or may not
53  * be the same as the block that writes to this buffer
54  */
55 GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
56  size_t sizeof_item,
57  uint64_t downstream_lcm_nitems,
58  uint32_t downstream_max_out_mult,
59  block_sptr link = block_sptr(),
60  block_sptr buf_owner = block_sptr());
61 
62 /*!
63  * \brief Single writer, multiple reader fifo.
64  * \ingroup internal
65  */
67 {
68 public:
71 
72  ~buffer() override;
73 
74  /*!
75  * \brief return the buffer's mapping type
76  */
77  buffer_mapping_type get_mapping_type() { return d_buf_map_type; }
78 
79  /*!
80  * \brief return number of items worth of space available for writing
81  */
82  virtual int space_available() = 0;
83 
84  /*!
85  * \brief return size of this buffer in items
86  */
87  unsigned int bufsize() const { return d_bufsize; }
88 
89  /*!
90  * \brief return the base address of the buffer
91  */
92  const char* base() const { return static_cast<const char*>(d_base); }
93 
94  /*!
95  * \brief return pointer to write buffer.
96  *
97  * The return value points at space that can hold at least
98  * space_available() items.
99  */
100  virtual void* write_pointer();
101 
102  /*!
103  * \brief return pointer to read buffer.
104  *
105  * The return value points to at least items_available() items.
106  */
107  virtual const void* _read_pointer(unsigned int read_index);
108 
109  /*!
110  * \brief tell buffer that we wrote \p nitems into it
111  */
112  void update_write_pointer(int nitems);
113 
114  void set_done(bool done);
115  bool done() const { return d_done; }
116 
117  /*!
118  * \brief Return the block that writes to this buffer.
119  */
120  block_sptr link() { return block_sptr(d_link); }
121 
122  size_t nreaders() const { return d_readers.size(); }
123  buffer_reader* reader(size_t index) { return d_readers[index]; }
124 
125  gr::thread::mutex* mutex() { return &d_mutex; }
126 
127  uint64_t nitems_written() { return d_abs_write_offset; }
128 
130  {
131  d_write_index = 0;
132  d_abs_write_offset = 0;
133  }
134 
135  size_t get_sizeof_item() { return d_sizeof_item; }
136 
137  uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; }
138 
139  uint32_t get_max_reader_output_multiple() { return d_max_reader_output_multiple; }
140 
141  virtual void update_reader_block_history(unsigned history, [[maybe_unused]] int delay)
142  {
143  d_max_reader_history = std::max(d_max_reader_history, history);
144  d_has_history = (d_max_reader_history > 1);
145  }
146 
147  /*!
148  * \brief Adds a new tag to the buffer.
149  *
150  * \param tag the new tag
151  */
152  void add_item_tag(const tag_t& tag);
153 
154  /*!
155  * \brief Removes an existing tag from the buffer.
156  *
157  * If no such tag is found, does nothing.
158  * Note: Doesn't actually physically delete the tag, but
159  * marks it as deleted. For the user, this has the same effect:
160  * Any subsequent calls to get_tags_in_range() will not return
161  * the tag.
162  *
163  * \param tag the tag that needs to be removed
164  * \param id the unique ID of the block calling this function
165  */
166  void remove_item_tag(const tag_t& tag, long id);
167 
168  /*!
169  * \brief Removes all tags before \p max_time from buffer
170  *
171  * \param max_time the time (item number) to trim up until.
172  */
173  void prune_tags(uint64_t max_time);
174 
175  std::multimap<uint64_t, tag_t>::iterator get_tags_begin()
176  {
177  return d_item_tags.begin();
178  }
179  std::multimap<uint64_t, tag_t>::iterator get_tags_end() { return d_item_tags.end(); }
180  std::multimap<uint64_t, tag_t>::iterator get_tags_lower_bound(uint64_t x)
181  {
182  return d_item_tags.lower_bound(x);
183  }
184  std::multimap<uint64_t, tag_t>::iterator get_tags_upper_bound(uint64_t x)
185  {
186  return d_item_tags.upper_bound(x);
187  }
188 
189  /*!
190  * \brief Function to be executed after this object's owner completes the
191  * call to general_work()
192  */
193  virtual void post_work(int nitems) = 0;
194 
195  /*!
196  * \brief Returns true when the current thread is ready to call the callback,
197  * false otherwise. Note if input_blocked_callback is overridden then this
198  * function should also be overridden.
199  */
200  virtual bool input_blkd_cb_ready([[maybe_unused]] int items_required,
201  [[maybe_unused]] unsigned read_index)
202  {
203  return false;
204  }
205 
206  /*!
207  * \brief Callback function that the scheduler will call when it determines
208  * that the input is blocked. Override this function if needed.
209  */
210  virtual bool input_blocked_callback([[maybe_unused]] int items_required,
211  [[maybe_unused]] int items_avail,
212  [[maybe_unused]] unsigned read_index)
213  {
214  return false;
215  }
216 
217  /*!
218  * \brief Returns true if the current thread is ready to execute
219  * output_blocked_callback(), false otherwise. Note if the default
220  * output_blocked_callback is overridden this function should also be
221  * overridden.
222  */
223  virtual bool output_blkd_cb_ready([[maybe_unused]] int output_multiple)
224  {
225  return false;
226  }
227 
228  /*!
229  * \brief Callback function that the scheduler will call when it determines
230  * that the output is blocked. Override this function if needed.
231  */
232  virtual bool output_blocked_callback([[maybe_unused]] int output_multiple,
233  [[maybe_unused]] bool force = false)
234  {
235  return false;
236  }
237 
238  /*!
239  * \brief Increment the number of active pointers for this buffer.
240  */
241  inline void increment_active()
242  {
243  gr::thread::scoped_lock lock(d_mutex);
244 
245  d_cv.wait(lock, [this]() { return d_callback_flag == false; });
246  ++d_active_pointer_counter;
247  }
248 
249  /*!
250  * \brief Decrement the number of active pointers for this buffer and signal
251  * anyone waiting when the count reaches zero.
252  */
253  inline void decrement_active()
254  {
255  gr::thread::scoped_lock lock(d_mutex);
256 
257  if (--d_active_pointer_counter == 0)
258  d_cv.notify_all();
259  }
260 
261  /*!
262  * \brief "on_lock" function from the custom_lock_if.
263  */
264  void on_lock(gr::thread::scoped_lock& lock) override;
265 
266  /*!
267  * \brief "on_unlock" function from the custom_lock_if.
268  */
269  void on_unlock() override;
270 
271  friend std::ostream& operator<<(std::ostream& os, const buffer& buf);
272 
273  // -------------------------------------------------------------------------
274 
275  /*!
276  * \brief Assign buffer's transfer_type
277  */
278  void set_transfer_type(const transfer_type& type);
279 
280 private:
281  friend class buffer_reader;
282  friend class buffer_reader_sm;
283 
284  friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
285  size_t sizeof_item,
286  uint64_t downstream_lcm_nitems,
287  block_sptr link);
288  friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
289  int nzero_preload,
290  block_sptr link,
291  int delay);
292 
293 protected:
294  char* d_base; // base address of buffer inside d_vmcircbuf.
295  unsigned int d_bufsize; // in items
297 
298  // Keep track of maximum sample delay of any reader; Only prune tags past this.
300 
301  // Keep track of the maximum sample history requirements of all blocks that
302  // consume from this buffer
304 
305  // Indicates if d_max_reader_history > 1
307 
308  size_t d_sizeof_item; // in bytes
309  std::vector<buffer_reader*> d_readers;
310  std::weak_ptr<block> d_link; // block that writes to this buffer
311 
312  //
313  // The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags
314  // and the d_read_index's and d_abs_read_offset's in the buffer readers.
315  // Also d_callback_flag and d_active_pointer_counter.
316  //
318  unsigned int d_write_index; // in items [0,d_bufsize)
319  uint64_t d_abs_write_offset; // num items written since the start
320  bool d_done;
321  std::multimap<uint64_t, tag_t> d_item_tags;
323  //
327 
331 
333 
334  /*!
335  * \brief Increment read or write index for this buffer
336  */
337  virtual unsigned index_add(unsigned a, unsigned b) = 0;
338 
339  /*!
340  * \brief Decrement read or write index for this buffer
341  */
342  virtual unsigned index_sub(unsigned a, unsigned b) = 0;
343 
344  virtual bool allocate_buffer([[maybe_unused]] int nitems) { return false; };
345 
346  /*!
347  * \brief constructor is private. Use gr_make_buffer to create instances.
348  *
349  * Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
350  *
351  * \param buftype is an enum type that describes the buffer TODO: fix me
352  * \param nitems is the minimum number of items the buffer will hold.
353  * \param sizeof_item is the size of an item in bytes.
354  * \param downstream_lcm_nitems is the least common multiple of the items to
355  * read by downstream block(s)
356  * \param downstream_max_out_mult is the maximum output multiple of all
357  * downstream blocks
358  * \param link is the block that writes to this buffer.
359  *
360  * The total size of the buffer will be rounded up to a system
361  * dependent boundary. This is typically the system page size, but
362  * under MS windows is 64KB.
363  */
365  int nitems,
366  size_t sizeof_item,
367  uint64_t downstream_lcm_nitems,
368  uint32_t downstream_max_out_mult,
369  block_sptr link);
370 
371  /*!
372  * \brief disassociate \p reader from this buffer
373  */
374  void drop_reader(buffer_reader* reader);
375 };
376 
377 //! returns # of buffers currently allocated
379 
380 } /* namespace gr */
381 
382 #endif /* INCLUDED_GR_RUNTIME_BUFFER_H */
Definition: buffer_reader_sm.h:22
How we keep track of the readers of a gr::buffer.
Definition: buffer_reader.h:49
Single writer, multiple reader fifo.
Definition: buffer.h:67
uint32_t d_active_pointer_counter
Definition: buffer.h:326
size_t nreaders() const
Definition: buffer.h:122
virtual void update_reader_block_history(unsigned history, [[maybe_unused]] int delay)
Definition: buffer.h:141
unsigned d_max_reader_delay
Definition: buffer.h:299
void remove_item_tag(const tag_t &tag, long id)
Removes an existing tag from the buffer.
gr::thread::mutex d_mutex
Definition: buffer.h:317
std::weak_ptr< block > d_link
Definition: buffer.h:310
const char * base() const
return the base address of the buffer
Definition: buffer.h:92
buffer_reader * reader(size_t index)
Definition: buffer.h:123
uint64_t get_downstream_lcm_nitems()
Definition: buffer.h:137
uint64_t d_write_multiple
Definition: buffer.h:329
void prune_tags(uint64_t max_time)
Removes all tags before max_time from buffer.
bool done() const
Definition: buffer.h:115
void decrement_active()
Decrement the number of active pointers for this buffer and signal anyone waiting when the count reac...
Definition: buffer.h:253
void increment_active()
Increment the number of active pointers for this buffer.
Definition: buffer.h:241
void add_item_tag(const tag_t &tag)
Adds a new tag to the buffer.
virtual void post_work(int nitems)=0
Function to be executed after this object's owner completes the call to general_work()
gr::logger_ptr d_logger
Definition: buffer.h:69
gr::thread::mutex * mutex()
Definition: buffer.h:125
virtual unsigned index_add(unsigned a, unsigned b)=0
Increment read or write index for this buffer.
friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, block_sptr link)
unsigned int d_bufsize
Definition: buffer.h:295
std::multimap< uint64_t, tag_t >::iterator get_tags_upper_bound(uint64_t x)
Definition: buffer.h:184
virtual bool output_blkd_cb_ready([[maybe_unused]] int output_multiple)
Returns true if the current thread is ready to execute output_blocked_callback(), false otherwise....
Definition: buffer.h:223
virtual bool allocate_buffer([[maybe_unused]] int nitems)
Definition: buffer.h:344
std::multimap< uint64_t, tag_t >::iterator get_tags_lower_bound(uint64_t x)
Definition: buffer.h:180
uint32_t d_max_reader_output_multiple
Definition: buffer.h:330
virtual void * write_pointer()
return pointer to write buffer.
~buffer() override
virtual const void * _read_pointer(unsigned int read_index)
return pointer to read buffer.
void reset_nitem_counter()
Definition: buffer.h:129
void on_unlock() override
"on_unlock" function from the custom_lock_if.
uint64_t d_abs_write_offset
Definition: buffer.h:319
buffer_mapping_type d_buf_map_type
Definition: buffer.h:296
friend std::ostream & operator<<(std::ostream &os, const buffer &buf)
virtual bool output_blocked_callback([[maybe_unused]] int output_multiple, [[maybe_unused]] bool force=false)
Callback function that the scheduler will call when it determines that the output is blocked....
Definition: buffer.h:232
uint32_t get_max_reader_output_multiple()
Definition: buffer.h:139
bool d_has_history
Definition: buffer.h:306
virtual int space_available()=0
return number of items worth of space available for writing
void set_done(bool done)
virtual bool input_blocked_callback([[maybe_unused]] int items_required, [[maybe_unused]] int items_avail, [[maybe_unused]] unsigned read_index)
Callback function that the scheduler will call when it determines that the input is blocked....
Definition: buffer.h:210
void drop_reader(buffer_reader *reader)
disassociate reader from this buffer
block_sptr link()
Return the block that writes to this buffer.
Definition: buffer.h:120
buffer_mapping_type get_mapping_type()
return the buffer's mapping type
Definition: buffer.h:77
void set_transfer_type(const transfer_type &type)
Assign buffer's transfer_type.
size_t get_sizeof_item()
Definition: buffer.h:135
unsigned int bufsize() const
return size of this buffer in items
Definition: buffer.h:87
uint64_t d_downstream_lcm_nitems
Definition: buffer.h:328
std::multimap< uint64_t, tag_t > d_item_tags
Definition: buffer.h:321
std::vector< buffer_reader * > d_readers
Definition: buffer.h:309
friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay)
Create a new gr::buffer_reader and attach it to buffer buf.
virtual bool input_blkd_cb_ready([[maybe_unused]] int items_required, [[maybe_unused]] unsigned read_index)
Returns true when the current thread is ready to call the callback, false otherwise....
Definition: buffer.h:200
void on_lock(gr::thread::scoped_lock &lock) override
"on_lock" function from the custom_lock_if.
unsigned d_max_reader_history
Definition: buffer.h:303
void update_write_pointer(int nitems)
tell buffer that we wrote nitems into it
bool d_done
Definition: buffer.h:320
unsigned int d_write_index
Definition: buffer.h:318
char * d_base
Definition: buffer.h:294
virtual unsigned index_sub(unsigned a, unsigned b)=0
Decrement read or write index for this buffer.
uint64_t nitems_written()
Definition: buffer.h:127
size_t d_sizeof_item
Definition: buffer.h:308
buffer(buffer_mapping_type buftype, int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, uint32_t downstream_max_out_mult, block_sptr link)
constructor is private. Use gr_make_buffer to create instances.
std::multimap< uint64_t, tag_t >::iterator get_tags_end()
Definition: buffer.h:179
transfer_type d_transfer_type
Definition: buffer.h:332
gr::thread::condition_variable d_cv
Definition: buffer.h:324
std::multimap< uint64_t, tag_t >::iterator get_tags_begin()
Definition: buffer.h:175
gr::logger_ptr d_debug_logger
Definition: buffer.h:70
uint64_t d_last_min_items_read
Definition: buffer.h:322
bool d_callback_flag
Definition: buffer.h:325
Definition: custom_lock.h:27
#define GR_RUNTIME_API
Definition: gnuradio-runtime/include/gnuradio/api.h:18
boost::mutex mutex
Definition: thread.h:37
boost::unique_lock< boost::mutex > scoped_lock
Definition: thread.h:38
boost::condition_variable condition_variable
Definition: thread.h:39
GNU Radio logging wrapper.
Definition: basic_block.h:29
std::function< void *(void *, const void *, std::size_t)> mem_func_t
Definition: buffer.h:36
std::shared_ptr< logger > logger_ptr
Definition: logger.h:250
GR_RUNTIME_API buffer_sptr make_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, uint32_t downstream_max_out_mult, block_sptr link=block_sptr(), block_sptr buf_owner=block_sptr())
Allocate a buffer that holds at least nitems of size sizeof_item.
transfer_type
Definition: transfer_type.h:19
buffer_mapping_type
Definition: buffer.h:34
GR_RUNTIME_API long buffer_ncurrently_allocated()
returns # of buffers currently allocated
Template used to create buffer types. Note that the factory_class parameter must contain a static fun...
Definition: buffer_type.h:94
Definition: tags.h:19