SeqAn3 3.4.0-rc.1
The Modern C++ library for sequence analysis.
Loading...
Searching...
No Matches
serialised_resource_pool.hpp
Go to the documentation of this file.
1// -----------------------------------------------------------------------------------------------------
2// Copyright (c) 2006-2023, Knut Reinert & Freie Universität Berlin
3// Copyright (c) 2016-2023, Knut Reinert & MPI für molekulare Genetik
4// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6// -----------------------------------------------------------------------------------------------------
7
14#pragma once
15
16#include <mutex>
17
19
20namespace seqan3::contrib
21{
22
23// ============================================================================
24// Classes
25// ============================================================================
26
27template <typename TValue>
28struct ResourcePool
29{
30 typedef ConcurrentQueue<TValue *, Suspendable<>> TStack;
31 typedef typename TStack::TSize TSize;
32
33 TStack recycled;
34
35 ResourcePool(TSize maxSize)
36 {
37 setWriterCount(recycled, 1);
38 for (; maxSize != 0; --maxSize)
39 appendValue(recycled, (TValue *)NULL);
40 }
41
42 ~ResourcePool()
43 {
44 unlockWriting(recycled);
45 TValue * ptr = NULL;
46 while (popBack(ptr, recycled))
47 {
48 delete ptr;
49 }
50 }
51};
52
53// ----------------------------------------------------------------------------
54// Struct SerializerItem
55// ----------------------------------------------------------------------------
56
57template <typename TValue>
58struct SerializerItem
59{
60 TValue val;
61 SerializerItem * next;
62 bool ready;
63};
64
65// ----------------------------------------------------------------------------
66// Class Serializer
67// ----------------------------------------------------------------------------
68
69template <typename TValue, typename TWorker>
70class Serializer
71{
72public:
73 typedef SerializerItem<TValue> TItem;
74 typedef TItem * TItemPtr;
75 typedef ResourcePool<TItem> TPool;
76 typedef size_t TSize;
77
78 std::mutex cs;
79 TWorker worker;
80 TItemPtr first;
81 TItemPtr last;
82 TPool pool;
83 bool stop;
84
85 Serializer() : first(NULL), last(NULL), stop(false)
86 {}
87
88 template <typename TArg>
89 explicit Serializer(TArg & arg, TSize maxItems = 1024) :
90 worker(arg),
91 first(NULL),
92 last(NULL),
93 pool(maxItems),
94 stop(false)
95 {}
96
97 ~Serializer()
98 {
99 while (first != NULL)
100 {
101 TItemPtr item = first;
102 first = first->next;
103 delete item;
104 }
105 }
106
107 operator bool()
108 {
109 return !stop;
110 }
111};
112
113// ============================================================================
114// Functions
115// ============================================================================
116
117// ----------------------------------------------------------------------------
118// Function aquireValue()
119// ----------------------------------------------------------------------------
120
121template <typename TValue>
122inline TValue * aquireValue(ResourcePool<TValue> & me)
123{
124 TValue * ptr = NULL;
125 if (!popBack(ptr, me.recycled))
126 return NULL;
127
128 if (ptr == NULL)
129 ptr = new TValue;
130
131 return ptr;
132}
133
134// ----------------------------------------------------------------------------
135// Function releaseValue()
136// ----------------------------------------------------------------------------
137
138template <typename TValue>
139inline void releaseValue(ResourcePool<TValue> & me, TValue * ptr)
140{
141 appendValue(me.recycled, ptr);
142}
143
144// ----------------------------------------------------------------------------
145// Function clear()
146// ----------------------------------------------------------------------------
147
148template <typename TValue, typename TWorker>
149inline void clear(Serializer<TValue, TWorker> & me)
150{
151 me.stop = false;
152 while (me.first != NULL)
153 {
154 TValue * item = me.first;
155 me.first = me.first->next;
156 releaseValue(me.recycled, item);
157 }
158 me.last = NULL;
159}
160
161// ----------------------------------------------------------------------------
162// Function aquireValue()
163// ----------------------------------------------------------------------------
164
165// this function is not thread-safe as it would make
166// not much sense to order a stream by the random
167// order of executition behind a mutex
168template <typename TValue, typename TWorker>
169inline TValue * aquireValue(Serializer<TValue, TWorker> & me)
170{
171 typedef SerializerItem<TValue> TItem;
172
173 TItem * item = aquireValue(me.pool);
174 item->next = NULL;
175 item->ready = false;
176
177 // add item to the end of our linked list
178 {
180 if (me.first == NULL)
181 me.first = item;
182 else
183 me.last->next = item;
184 me.last = item;
185 }
186 return &item->val;
187}
188
189// ----------------------------------------------------------------------------
190// Function releaseValue()
191// ----------------------------------------------------------------------------
192
193template <typename TValue, typename TWorker>
194inline bool releaseValue(Serializer<TValue, TWorker> & me, TValue * ptr)
195{
196 typedef SerializerItem<TValue> TItem;
197
198 TItem * item = reinterpret_cast<TItem *>(ptr);
199 assert(!item->ready);
200
201 // changing me.first or the ready flag must be done synchronized (me.mutex)
202 // the thread who changed me.first->ready to be true has to write it.
203
204 // change our ready flag and test if me.first->ready became true
205 {
207 item->ready = true;
208 if (item != me.first)
209 return true;
210 }
211
212 // ok, if we are here it seems that we are responsible for writing the buffer
213
214 assert(me.first != NULL);
215
216 bool success;
217 do
218 {
219 // process item
220 success = me.worker(item->val);
221
222 // remove item from linked list
223 {
225 me.first = item->next;
226
227 // recycle released items
228 releaseValue(me.pool, item);
229
230 // can we leave?
231 item = me.first;
232 if (item == NULL || !item->ready)
233 return success;
234 }
235
236 // we continue to process the next buffer
237 }
238 while (success);
239
240 return false;
241}
242
243} // namespace seqan3::contrib
T lock(T... args)
T next(T... args)
Provides seqan suspendable queue.