Fast DDS  Version 3.6.1.0
Fast DDS
CacheChange.hpp
1 // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
19 #ifndef FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
20 #define FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
21 
22 #include <atomic>
23 #include <cassert>
24 #include <limits>
25 
26 #include <fastdds/rtps/common/ChangeKind_t.hpp>
27 #include <fastdds/rtps/common/FragmentNumber.hpp>
28 #include <fastdds/rtps/common/InstanceHandle.hpp>
29 #include <fastdds/rtps/common/SerializedPayload.hpp>
30 #include <fastdds/rtps/common/Time_t.hpp>
31 #include <fastdds/rtps/common/Types.hpp>
32 #include <fastdds/rtps/common/VendorId_t.hpp>
33 #include <fastdds/rtps/common/WriteParams.hpp>
34 #include <fastdds/rtps/history/IPayloadPool.hpp>
35 
36 namespace eprosima {
37 namespace fastdds {
38 namespace rtps {
39 
40 struct CacheChange_t;
41 
46 {
51  CacheChange_t* volatile previous = nullptr;
54  CacheChange_t* volatile next = nullptr;
56  std::atomic_bool is_linked {false};
59 };
60 
65 {
74 };
75 
80 struct FASTDDS_EXPORTED_API CacheChange_t
81 {
85  GUID_t writerGUID{};
87  InstanceHandle_t instanceHandle{};
89  SequenceNumber_t sequenceNumber{};
91  SerializedPayload_t serializedPayload{};
93  SerializedPayload_t inline_qos{};
95  bool isRead = false;
97  Time_t sourceTimestamp{};
100 
101  union
102  {
105  };
106 
107  WriteParams write_params{};
108  bool is_untyped_ = true;
109 
115  : writer_info()
116  {
117  inline_qos.encapsulation = DEFAULT_ENDIAN == LITTLEEND ? PL_CDR_LE : PL_CDR_BE;
118  }
119 
121  const CacheChange_t&) = delete;
122  const CacheChange_t& operator =(
123  const CacheChange_t&) = delete;
124 
131  uint32_t payload_size,
132  bool is_untyped = false)
133  : serializedPayload(payload_size)
134  , is_untyped_(is_untyped)
135  {
136  }
137 
143  bool copy(
144  const CacheChange_t* ch_ptr)
145  {
146  kind = ch_ptr->kind;
147  writerGUID = ch_ptr->writerGUID;
148  instanceHandle = ch_ptr->instanceHandle;
149  sequenceNumber = ch_ptr->sequenceNumber;
150  sourceTimestamp = ch_ptr->sourceTimestamp;
151  reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
152  write_params = ch_ptr->write_params;
153  isRead = ch_ptr->isRead;
154  vendor_id = ch_ptr->vendor_id;
155  fragment_size_ = ch_ptr->fragment_size_;
156  fragment_count_ = ch_ptr->fragment_count_;
157  first_missing_fragment_ = ch_ptr->first_missing_fragment_;
158 
159  return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
160  }
161 
168  const CacheChange_t* ch_ptr)
169  {
170  kind = ch_ptr->kind;
171  writerGUID = ch_ptr->writerGUID;
172  instanceHandle = ch_ptr->instanceHandle;
173  sequenceNumber = ch_ptr->sequenceNumber;
174  sourceTimestamp = ch_ptr->sourceTimestamp;
175  reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
176  write_params = ch_ptr->write_params;
177  isRead = ch_ptr->isRead;
178  vendor_id = ch_ptr->vendor_id;
179 
180  // Copy certain values from serializedPayload
181  serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
182  serializedPayload.is_serialized_key = ch_ptr->serializedPayload.is_serialized_key;
183 
184  // Copy fragment size and calculate fragment count
185  setFragmentSize(ch_ptr->fragment_size_, false);
186  }
187 
188  virtual ~CacheChange_t() = default;
189 
194  uint32_t getFragmentCount() const
195  {
196  return fragment_count_;
197  }
198 
203  uint16_t getFragmentSize() const
204  {
205  return fragment_size_;
206  }
207 
213  {
214  return first_missing_fragment_ >= fragment_count_;
215  }
216 
221  {
222  return 0 < first_missing_fragment_;
223  }
224 
230  FragmentNumberSet_t& frag_sns)
231  {
232  // Note: Fragment numbers are 1-based but we keep them 0 based.
233  frag_sns.base(first_missing_fragment_ + 1);
234 
235  // Traverse list of missing fragments, adding them to frag_sns
236  uint32_t current_frag = first_missing_fragment_;
237  while (current_frag < fragment_count_)
238  {
239  frag_sns.add(current_frag + 1);
240  current_frag = get_next_missing_fragment(current_frag);
241  }
242  }
243 
254  uint16_t fragment_size,
255  bool create_fragment_list = false)
256  {
257  fragment_size_ = fragment_size;
258  fragment_count_ = 0;
259  first_missing_fragment_ = 0;
260 
261  if (fragment_size > 0)
262  {
263  // This follows RTPS 8.3.7.3.5
264  fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
265 
266  if (create_fragment_list)
267  {
268  // Keep index of next fragment on the payload portion at the beginning of each fragment. Last
269  // fragment will have fragment_count_ as 'next fragment index'
270  size_t offset = 0;
271  for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
272  {
273  set_next_missing_fragment(i - 1, i); // index to next fragment in missing list
274  }
275  }
276  else
277  {
278  // List not created. This means we are going to send this change fragmented, so it is already
279  // assembled, and the missing list is empty (i.e. first missing points to fragment count)
280  first_missing_fragment_ = fragment_count_;
281  }
282  }
283  }
284 
286  const SerializedPayload_t& incoming_data,
287  uint32_t fragment_starting_num,
288  uint32_t fragments_in_submessage)
289  {
290  uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
291  uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
292  uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
293 
294  // Validate payload types
295  if (serializedPayload.is_serialized_key != incoming_data.is_serialized_key)
296  {
297  return false;
298  }
299 
300  // Validate fragment indexes
301  if (last_fragment_index > fragment_count_)
302  {
303  return false;
304  }
305 
306  // Update incoming length for last fragment
307  if (last_fragment_index == fragment_count_)
308  {
309  incoming_length = serializedPayload.length - original_offset;
310  }
311 
312  // Validate lengths
313  if (incoming_data.length < incoming_length)
314  {
315  return false;
316  }
317 
318  if (original_offset + incoming_length > serializedPayload.length)
319  {
320  return false;
321  }
322 
323  if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
324  {
325  memcpy(
326  &serializedPayload.data[original_offset],
327  incoming_data.data, incoming_length);
328  }
329 
330  return is_fully_assembled();
331  }
332 
341  uint32_t payload_size,
342  uint16_t fragment_size,
343  uint32_t& min_required_size)
344  {
345  if ((0 == fragment_size) || (payload_size <= fragment_size))
346  {
347  min_required_size = payload_size;
348  return true;
349  }
350 
351  // In order to avoid overflow on the calculations, we limit the maximum payload size
352  constexpr uint32_t MAX_PAYLOAD_SIZE = std::numeric_limits<uint32_t>::max() - 4u - 3u;
353  if (payload_size > MAX_PAYLOAD_SIZE)
354  {
355  return false;
356  }
357 
358  // Ensure fragment size is at least 4 bytes to store fragment index
359  if (fragment_size < 4u)
360  {
361  return false;
362  }
363 
364  // Calculate number of fragments without risk of overflow
365  uint32_t fragment_count = payload_size / fragment_size;
366  if (0 != (payload_size % fragment_size))
367  {
368  ++fragment_count;
369  }
370 
371  // This cannot overflow as the result will always be <= payload_size
372  uint32_t last_fragment_offset = (fragment_count - 1) * fragment_size;
373 
374  // Since we will write a fragment index at the beginning of each fragment,
375  // we need to ensure there is space for it in the last fragment.
376  // Note: we already imposed limits to ensure no overflow occurs.
377  min_required_size = (last_fragment_offset + 3u) & ~3u; // Align last fragment size to 4 bytes
378  min_required_size += 4u; // Add fragment index size
379 
380  // Ensure minimum size is at least payload size
381  if (min_required_size < payload_size)
382  {
383  min_required_size = payload_size;
384  }
385 
386  return true;
387  }
388 
389 private:
390 
391  // Fragment size
392  uint16_t fragment_size_ = 0;
393 
394  // Number of fragments
395  uint32_t fragment_count_ = 0;
396 
397  // First fragment in missing list
398  uint32_t first_missing_fragment_ = 0;
399 
400  uint32_t get_next_missing_fragment(
401  uint32_t fragment_index)
402  {
403  uint32_t* ptr = next_fragment_pointer(fragment_index);
404  return *ptr;
405  }
406 
407  void set_next_missing_fragment(
408  uint32_t fragment_index,
409  uint32_t next_fragment_index)
410  {
411  uint32_t* ptr = next_fragment_pointer(fragment_index);
412  *ptr = next_fragment_index;
413  }
414 
415  uint32_t* next_fragment_pointer(
416  uint32_t fragment_index)
417  {
418  size_t offset = fragment_size_;
419  offset *= fragment_index;
420  offset = (offset + 3u) & ~3u;
421  return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
422  }
423 
433  bool received_fragments(
434  uint32_t initial_fragment,
435  uint32_t num_of_fragments)
436  {
437  bool at_least_one_changed = false;
438 
439  if ((fragment_size_ > 0) && (initial_fragment < fragment_count_))
440  {
441  uint32_t last_fragment = initial_fragment + num_of_fragments;
442  if (last_fragment > fragment_count_)
443  {
444  last_fragment = fragment_count_;
445  }
446 
447  if (initial_fragment <= first_missing_fragment_)
448  {
449  // Perform first = *first until first >= last_received
450  while (first_missing_fragment_ < last_fragment)
451  {
452  first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
453  at_least_one_changed = true;
454  }
455  }
456  else
457  {
458  // Find prev in missing list
459  uint32_t current_frag = first_missing_fragment_;
460  while (current_frag < initial_fragment)
461  {
462  uint32_t next_frag = get_next_missing_fragment(current_frag);
463  if (next_frag >= initial_fragment)
464  {
465  // This is the fragment previous to initial_fragment.
466  // Find future value for next by repeating next = *next until next >= last_fragment.
467  uint32_t next_missing_fragment = next_frag;
468  while (next_missing_fragment < last_fragment)
469  {
470  next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
471  at_least_one_changed = true;
472  }
473 
474  // Update next and finish loop
475  if (at_least_one_changed)
476  {
477  set_next_missing_fragment(current_frag, next_missing_fragment);
478  }
479  break;
480  }
481  current_frag = next_frag;
482  }
483  }
484  }
485 
486  return at_least_one_changed;
487  }
488 
489 };
490 
491 } // namespace rtps
492 } // namespace fastdds
493 } // namespace eprosima
494 
495 #endif // FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
Template class to hold a range of items using a custom bitmap.
Definition: fixed_size_bitmap.hpp:76
T base() const noexcept
Get base of the range.
Definition: fixed_size_bitmap.hpp:133
bool add(const T &item) noexcept
Adds an element to the range.
Definition: fixed_size_bitmap.hpp:297
Structure Time_t, used to describe times at RTPS protocol.
Definition: Time_t.hpp:38
This class contains additional information of a CacheChange.
Definition: WriteParams.hpp:37
uint32_t FragmentNumber_t
Definition: FragmentNumber.hpp:34
@ LITTLEEND
Little endianness.
Definition: Types.hpp:44
const VendorId_t c_VendorId_Unknown
Definition: VendorId_t.hpp:34
std::array< uint8_t, 2 > VendorId_t
Structure VendorId_t, specifying the vendor Id of the implementation.
Definition: VendorId_t.hpp:32
constexpr Endianness_t DEFAULT_ENDIAN
Definition: Types.hpp:80
ChangeKind_t
Enumerates the different types of CacheChange_t.
Definition: ChangeKind_t.hpp:38
@ ALIVE
ALIVE.
Definition: ChangeKind_t.hpp:39
Structure CacheChange_t, contains information on a specific CacheChange.
Definition: CacheChange.hpp:81
ChangeKind_t kind
Kind of change, default value ALIVE.
Definition: CacheChange.hpp:83
bool is_untyped_
Definition: CacheChange.hpp:108
bool contains_first_fragment()
Checks if the first fragment is present.
Definition: CacheChange.hpp:220
InstanceHandle_t instanceHandle
Handle of the data associated with this change.
Definition: CacheChange.hpp:87
bool isRead
Indicates if the cache has been read (only used in READERS)
Definition: CacheChange.hpp:95
static bool calculate_required_fragmented_payload_size(uint32_t payload_size, uint16_t fragment_size, uint32_t &min_required_size)
Calculate the minimum required payload size to store a fragmented change.
Definition: CacheChange.hpp:340
fastdds::rtps::VendorId_t vendor_id
Vendor Id of the writer that generated this change.
Definition: CacheChange.hpp:99
Time_t sourceTimestamp
Source TimeStamp.
Definition: CacheChange.hpp:97
CacheChange_t(const CacheChange_t &)=delete
void setFragmentSize(uint16_t fragment_size, bool create_fragment_list=false)
Set fragment size for this change.
Definition: CacheChange.hpp:253
CacheChangeReaderInfo_t reader_info
Definition: CacheChange.hpp:103
void copy_not_memcpy(const CacheChange_t *ch_ptr)
Copy information form a different change into this one.
Definition: CacheChange.hpp:167
uint16_t getFragmentSize() const
Get the size of each fragment this change is split into.
Definition: CacheChange.hpp:203
uint32_t getFragmentCount() const
Get the number of fragments this change is split into.
Definition: CacheChange.hpp:194
bool copy(const CacheChange_t *ch_ptr)
Copy a different change into this one.
Definition: CacheChange.hpp:143
bool add_fragments(const SerializedPayload_t &incoming_data, uint32_t fragment_starting_num, uint32_t fragments_in_submessage)
Definition: CacheChange.hpp:285
CacheChangeWriterInfo_t writer_info
Definition: CacheChange.hpp:104
GUID_t writerGUID
GUID_t of the writer that generated this change.
Definition: CacheChange.hpp:85
CacheChange_t(uint32_t payload_size, bool is_untyped=false)
Constructor with payload size.
Definition: CacheChange.hpp:130
SerializedPayload_t serializedPayload
Serialized Payload associated with the change.
Definition: CacheChange.hpp:91
void get_missing_fragments(FragmentNumberSet_t &frag_sns)
Fills a FragmentNumberSet_t with the list of missing fragments.
Definition: CacheChange.hpp:229
SequenceNumber_t sequenceNumber
SequenceNumber of the change.
Definition: CacheChange.hpp:89
CacheChange_t()
Default constructor.
Definition: CacheChange.hpp:114
WriteParams write_params
Definition: CacheChange.hpp:107
bool is_fully_assembled()
Checks if all fragments have been received.
Definition: CacheChange.hpp:212
Specific information for a reader.
Definition: CacheChange.hpp:65
Time_t receptionTimestamp
Reception TimeStamp (only used in Readers)
Definition: CacheChange.hpp:67
int32_t no_writers_generation_count
No-writers generation of the instance when this entry was added to it.
Definition: CacheChange.hpp:71
uint32_t writer_ownership_strength
Ownership stregth of its writer when the sample was received.
Definition: CacheChange.hpp:73
int32_t disposed_generation_count
Disposed generation of the instance when this entry was added to it.
Definition: CacheChange.hpp:69
Specific information for a writer.
Definition: CacheChange.hpp:46
std::atomic_bool is_linked
Used to know if the object is already in a list.
Definition: CacheChange.hpp:56
FragmentNumber_t last_fragment_sent
Last fragment number sent.
Definition: CacheChange.hpp:58
size_t num_sent_submessages
Number of DATA / DATA_FRAG submessages sent to the transport (only used in Writers)
Definition: CacheChange.hpp:48
CacheChange_t *volatile next
Used to link with next node in a list.
Definition: CacheChange.hpp:54
CacheChange_t *volatile previous
Used to link with previous node in a list.
Definition: CacheChange.hpp:51
Structure GUID_t, entity identifier, unique in DDS-RTPS Domain.
Definition: Guid.hpp:40
Struct InstanceHandle_t, used to contain the key for WITH_KEY topics.
Definition: InstanceHandle.hpp:154
Structure SequenceNumber_t, different for each change in the same writer.
Definition: SequenceNumber.hpp:38
Structure SerializedPayload_t.
Definition: SerializedPayload.hpp:59
octet * data
Pointer to the data.
Definition: SerializedPayload.hpp:68
uint16_t encapsulation
Encapsulation of the data as suggested in the RTPS 2.1 specification chapter 10.
Definition: SerializedPayload.hpp:64
bool is_serialized_key
Whether the payload contains a serialized key, or the whole data.
Definition: SerializedPayload.hpp:76
uint32_t length
Actual length of the data.
Definition: SerializedPayload.hpp:66