Fast DDS  Version 3.6.1.0
Fast DDS
WriterHistory.hpp
1 // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
19 #ifndef FASTDDS_RTPS_HISTORY__WRITERHISTORY_HPP
20 #define FASTDDS_RTPS_HISTORY__WRITERHISTORY_HPP
21 
22 #include <cstdint>
23 #include <memory>
24 
25 #include <fastdds/fastdds_dll.hpp>
26 #include <fastdds/dds/log/Log.hpp>
27 #include <fastdds/rtps/common/CacheChange.hpp>
28 #include <fastdds/rtps/common/ChangeKind_t.hpp>
29 #include <fastdds/rtps/common/InstanceHandle.hpp>
30 #include <fastdds/rtps/history/History.hpp>
31 #include <fastdds/rtps/history/IChangePool.hpp>
32 #include <fastdds/rtps/history/IPayloadPool.hpp>
33 
34 namespace eprosima {
35 namespace fastdds {
36 namespace rtps {
37 
38 class BaseWriter;
39 class HistoryAttributes;
40 class WriteParams;
41 
47 {
48  friend class BaseWriter;
49  friend class PersistentWriter;
50  friend class IPersistenceService;
51 
53  WriterHistory&&) = delete;
54  WriterHistory& operator =(
55  WriterHistory&&) = delete;
56 
57 public:
58 
64  FASTDDS_EXPORTED_API WriterHistory(
65  const HistoryAttributes& att);
66 
73  FASTDDS_EXPORTED_API WriterHistory(
74  const HistoryAttributes& att,
75  const std::shared_ptr<IPayloadPool>& payload_pool);
76 
84  FASTDDS_EXPORTED_API WriterHistory(
85  const HistoryAttributes& att,
86  const std::shared_ptr<IPayloadPool>& payload_pool,
87  const std::shared_ptr<IChangePool>& change_pool);
88 
89  FASTDDS_EXPORTED_API virtual ~WriterHistory() override;
90 
96  FASTDDS_EXPORTED_API const std::shared_ptr<IPayloadPool>& get_payload_pool() const;
97 
103  FASTDDS_EXPORTED_API const std::shared_ptr<IChangePool>& get_change_pool() const;
104 
115  FASTDDS_EXPORTED_API CacheChange_t* create_change(
116  ChangeKind_t change_kind,
118 
130  FASTDDS_EXPORTED_API CacheChange_t* create_change(
131  uint32_t payload_size,
132  ChangeKind_t change_kind,
134 
140  FASTDDS_EXPORTED_API bool add_change(
141  CacheChange_t* a_change);
142 
149  FASTDDS_EXPORTED_API bool add_change(
150  CacheChange_t* a_change,
151  WriteParams& wparams);
152 
160  FASTDDS_EXPORTED_API iterator remove_change_nts(
161  const_iterator removal,
162  bool release = true) override;
163 
172  FASTDDS_EXPORTED_API iterator remove_change_nts(
173  const_iterator removal,
174  const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
175  bool release = true) override;
176 
183  FASTDDS_EXPORTED_API bool matches_change(
184  const CacheChange_t* inner,
185  CacheChange_t* outer) override;
186 
189 
190  FASTDDS_EXPORTED_API virtual bool remove_change_g(
191  CacheChange_t* a_change);
192 
193  FASTDDS_EXPORTED_API virtual bool remove_change_g(
194  CacheChange_t* a_change,
195  const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
196 
197  FASTDDS_EXPORTED_API bool remove_change(
198  const SequenceNumber_t& sequence_number);
199 
200  FASTDDS_EXPORTED_API CacheChange_t* remove_change_and_reuse(
201  const SequenceNumber_t& sequence_number);
202 
207  FASTDDS_EXPORTED_API bool remove_min_change();
208 
214  FASTDDS_EXPORTED_API bool remove_min_change(
215  const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
216 
217  FASTDDS_EXPORTED_API SequenceNumber_t next_sequence_number() const
218  {
219  return m_lastCacheChangeSeqNum + 1;
220  }
221 
236  FASTDDS_EXPORTED_API bool release_change(
237  CacheChange_t* ch);
238 
239 protected:
240 
241  FASTDDS_EXPORTED_API void do_release_cache(
242  CacheChange_t* ch) override;
243 
258  CacheChange_t* a_change,
259  WriteParams& wparams,
260  std::chrono::time_point<std::chrono::steady_clock> max_blocking_time
261  = std::chrono::steady_clock::now() + std::chrono::hours(24));
262 
278  template<typename PreCommitHook>
280  CacheChange_t* a_change,
281  WriteParams& wparams,
282  PreCommitHook pre_commit,
283  std::chrono::time_point<std::chrono::steady_clock> max_blocking_time)
284  {
285  if (mp_writer == nullptr || mp_mutex == nullptr)
286  {
287  EPROSIMA_LOG_ERROR(RTPS_WRITER_HISTORY,
288  "You need to create a Writer with this History before adding any changes");
289  return false;
290  }
291 
292  std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
293  if (!prepare_and_add_change(a_change, wparams))
294  {
295  return false;
296  }
297 
298  pre_commit(*a_change);
299  notify_writer(a_change, max_blocking_time);
300 
301  return true;
302  }
303 
307  BaseWriter* mp_writer = nullptr;
308 
309  uint32_t high_mark_for_frag_ = 0;
310 
311 private:
312 
324  bool prepare_and_add_change(
325  CacheChange_t* a_change,
326  WriteParams& wparams);
327 
336  void notify_writer(
337  CacheChange_t* a_change,
338  const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
339 
340  void set_fragments(
341  CacheChange_t* change);
342 
344  std::shared_ptr<IChangePool> change_pool_;
346  std::shared_ptr<IPayloadPool> payload_pool_;
347 };
348 
349 } // namespace rtps
350 } // namespace fastdds
351 } // namespace eprosima
352 
353 #endif // FASTDDS_RTPS_HISTORY__WRITERHISTORY_HPP
Class HistoryAttributes, to specify the attributes of a WriterHistory or a ReaderHistory.
Definition: HistoryAttributes.hpp:39
Class History, container of the different CacheChanges and the methods to access them.
Definition: History.hpp:45
std::vector< CacheChange_t * >::iterator iterator
Definition: History.hpp:58
FASTDDS_EXPORTED_API bool remove_change(CacheChange_t *ch)
Remove a specific change from the history.
std::vector< CacheChange_t * >::const_iterator const_iterator
Definition: History.hpp:60
RecursiveTimedMutex * mp_mutex
Mutex for the History.
Definition: History.hpp:269
This class contains additional information of a CacheChange.
Definition: WriteParams.hpp:37
Class WriterHistory, container of the different CacheChanges of a writer.
Definition: WriterHistory.hpp:47
friend class BaseWriter
Definition: WriterHistory.hpp:48
friend class PersistentWriter
Definition: WriterHistory.hpp:49
uint32_t high_mark_for_frag_
Definition: WriterHistory.hpp:309
virtual FASTDDS_EXPORTED_API bool remove_change_g(CacheChange_t *a_change, const std::chrono::time_point< std::chrono::steady_clock > &max_blocking_time)
FASTDDS_EXPORTED_API iterator remove_change_nts(const_iterator removal, const std::chrono::time_point< std::chrono::steady_clock > &max_blocking_time, bool release=true) override
Remove a specific change from the history.
bool add_change_with_commit_hook(CacheChange_t *a_change, WriteParams &wparams, PreCommitHook pre_commit, std::chrono::time_point< std::chrono::steady_clock > max_blocking_time)
Introduce a change into the history, and let the associated writer send it.
Definition: WriterHistory.hpp:279
bool add_change_(CacheChange_t *a_change, WriteParams &wparams, std::chrono::time_point< std::chrono::steady_clock > max_blocking_time=std::chrono::steady_clock::now()+std::chrono::hours(24))
Introduce a change into the history, and let the associated writer send it.
FASTDDS_EXPORTED_API CacheChange_t * create_change(ChangeKind_t change_kind, InstanceHandle_t handle=c_InstanceHandle_Unknown)
Create a new CacheChange_t object.
BaseWriter * mp_writer
Pointer to the associated writer.
Definition: WriterHistory.hpp:307
FASTDDS_EXPORTED_API bool add_change(CacheChange_t *a_change, WriteParams &wparams)
Add a CacheChange_t to the WriterHistory.
FASTDDS_EXPORTED_API const std::shared_ptr< IPayloadPool > & get_payload_pool() const
Get the payload pool used by this history.
virtual FASTDDS_EXPORTED_API bool remove_change_g(CacheChange_t *a_change)
FASTDDS_EXPORTED_API bool remove_min_change(const std::chrono::time_point< std::chrono::steady_clock > &max_blocking_time)
Remove the CacheChange_t with the minimum sequenceNumber.
FASTDDS_EXPORTED_API bool add_change(CacheChange_t *a_change)
Add a CacheChange_t to the WriterHistory.
SequenceNumber_t m_lastCacheChangeSeqNum
Last CacheChange Sequence Number added to the History.
Definition: WriterHistory.hpp:305
FASTDDS_EXPORTED_API bool remove_change(const SequenceNumber_t &sequence_number)
FASTDDS_EXPORTED_API iterator remove_change_nts(const_iterator removal, bool release=true) override
Remove a specific change from the history.
FASTDDS_EXPORTED_API CacheChange_t * create_change(uint32_t payload_size, ChangeKind_t change_kind, InstanceHandle_t handle=c_InstanceHandle_Unknown)
Create a new CacheChange_t object with a specific payload size.
FASTDDS_EXPORTED_API const std::shared_ptr< IChangePool > & get_change_pool() const
Get the change pool used by this history.
FASTDDS_EXPORTED_API bool matches_change(const CacheChange_t *inner, CacheChange_t *outer) override
Criteria to search a specific CacheChange_t on history.
FASTDDS_EXPORTED_API void do_release_cache(CacheChange_t *ch) override
virtual FASTDDS_EXPORTED_API ~WriterHistory() override
FASTDDS_EXPORTED_API WriterHistory(const HistoryAttributes &att)
Construct a WriterHistory.
FASTDDS_EXPORTED_API bool release_change(CacheChange_t *ch)
Release a change when it is not being used anymore.
FASTDDS_EXPORTED_API WriterHistory(const HistoryAttributes &att, const std::shared_ptr< IPayloadPool > &payload_pool, const std::shared_ptr< IChangePool > &change_pool)
Construct a WriterHistory with custom payload and change pools.
FASTDDS_EXPORTED_API bool remove_min_change()
Remove the CacheChange_t with the minimum sequenceNumber.
FASTDDS_EXPORTED_API WriterHistory(const HistoryAttributes &att, const std::shared_ptr< IPayloadPool > &payload_pool)
Construct a WriterHistory with a custom payload pool.
FASTDDS_EXPORTED_API SequenceNumber_t next_sequence_number() const
Definition: WriterHistory.hpp:217
FASTDDS_EXPORTED_API CacheChange_t * remove_change_and_reuse(const SequenceNumber_t &sequence_number)
friend class IPersistenceService
Definition: WriterHistory.hpp:50
ChangeKind_t
Enumerates the different types of CacheChange_t.
Definition: ChangeKind_t.hpp:38
const InstanceHandle_t c_InstanceHandle_Unknown
Definition: InstanceHandle.hpp:211
Structure CacheChange_t, contains information on a specific CacheChange.
Definition: CacheChange.hpp:81
Struct InstanceHandle_t, used to contain the key for WITH_KEY topics.
Definition: InstanceHandle.hpp:154
Structure SequenceNumber_t, different for each change in the same writer.
Definition: SequenceNumber.hpp:38