XRootD
Loading...
Searching...
No Matches
XrdEcRedundancyProvider.cc
Go to the documentation of this file.
1/************************************************************************
2 * KineticIo - a file io interface library to kinetic devices. *
3 * *
4 * This Source Code Form is subject to the terms of the Mozilla *
5 * Public License, v. 2.0. If a copy of the MPL was not *
6 * distributed with this file, You can obtain one at *
7 * https://mozilla.org/MP:/2.0/. *
8 * *
9 * This program is distributed in the hope that it will be useful, *
10 * but is provided AS-IS, WITHOUT ANY WARRANTY; including without *
11 * the implied warranty of MERCHANTABILITY, NON-INFRINGEMENT or *
12 * FITNESS FOR A PARTICULAR PURPOSE. See the Mozilla Public *
13 * License for more details. *
14 ************************************************************************/
15
17
18#include <isa-l.h>
19#include <cstring>
20#include <sstream>
21#include <algorithm>
22
23namespace XrdEc
24{
25
26//--------------------------------------------------------------------------
30//--------------------------------------------------------------------------
31class Convert{
32 public:
33 //--------------------------------------------------------------------------
38 //--------------------------------------------------------------------------
39 template<typename...Args>
40 static std::string toString(Args&&...args){
41 std::stringstream s;
42 argsToStream(s, std::forward<Args>(args)...);
43 return s.str();
44 }
45 private:
46 //--------------------------------------------------------------------------
50 //--------------------------------------------------------------------------
51 template<typename Last>
52 static void argsToStream(std::stringstream& stream, Last&& last) {
53 stream << last;
54 }
55
56 //--------------------------------------------------------------------------
61 //--------------------------------------------------------------------------
62 template<typename First, typename...Rest >
63 static void argsToStream(std::stringstream& stream, First&& first, Rest&&...rest) {
64 stream << first;
65 argsToStream(stream, std::forward<Rest>(rest)...);
66 }
67};
68
69
70
71/* This function is (almost) completely ripped from the erasure_code_test.cc file
72 distributed with the isa-l library. */
74 unsigned char* encode_matrix, // in: encode matrix
75 unsigned char* decode_matrix, // in: buffer, out: generated decode matrix
76 unsigned int* decode_index, // out: order of healthy blocks used for decoding [data#1, data#3, ..., parity#1... ]
77 unsigned char* src_err_list, // in: array of #nerrs size [index error #1, index error #2, ... ]
78 unsigned char* src_in_err, // in: array of #data size > [1,0,0,0,1,0...] -> 0 == no error, 1 == error
79 unsigned int nerrs, // #total errors
80 unsigned int nsrcerrs, // #data errors
81 unsigned int k, // #data
82 unsigned int m // #data+parity
83)
84{
85 unsigned i, j, p;
86 unsigned int r;
87 unsigned char* invert_matrix, * backup, * b, s;
88 int incr = 0;
89
90 size_t mk = (size_t)m * (size_t)k;
91 std::vector<unsigned char> memory(3 * mk);
92 b = &memory[0];
93 backup = &memory[mk];
94 invert_matrix = &memory[2 * mk];
95
96 // Construct matrix b by removing error rows
97 for (i = 0, r = 0; i < k; i++, r++) {
98 while (src_in_err[r]) {
99 r++;
100 }
101 for (j = 0; j < k; j++) {
102 b[k * i + j] = encode_matrix[k * r + j];
103 backup[k * i + j] = encode_matrix[k * r + j];
104 }
105 decode_index[i] = r;
106 }
107 incr = 0;
108 while (gf_invert_matrix(b, invert_matrix, k) < 0) {
109 if (nerrs == (m - k)) {
110 return -1;
111 }
112 incr++;
113 memcpy(b, backup, mk);
114 for (i = nsrcerrs; i < nerrs - nsrcerrs; i++) {
115 if (src_err_list[i] == (decode_index[k - 1] + incr)) {
116 // skip the erased parity line
117 incr++;
118 continue;
119 }
120 }
121 if (decode_index[k - 1] + incr >= m) {
122 return -1;
123 }
124 decode_index[k - 1] += incr;
125 for (j = 0; j < k; j++) {
126 b[k * (k - 1) + j] = encode_matrix[k * decode_index[k - 1] + j];
127 }
128
129 };
130
131 for (i = 0; i < nsrcerrs; i++) {
132 for (j = 0; j < k; j++) {
133 decode_matrix[k * i + j] = invert_matrix[k * src_err_list[i] + j];
134 }
135 }
136 /* src_err_list from encode_matrix * invert of b for parity decoding */
137 for (p = nsrcerrs; p < nerrs; p++) {
138 for (i = 0; i < k; i++) {
139 s = 0;
140 for (j = 0; j < k; j++) {
141 s ^= gf_mul(invert_matrix[j * k + i],
142 encode_matrix[k * src_err_list[p] + j]);
143 }
144
145 decode_matrix[k * p + i] = s;
146 }
147 }
148 return 0;
149}
150
152 objcfg( objcfg ),
153 encode_matrix( objcfg.nbchunks * objcfg.nbdata )
154{
155 // k = data
156 // m = data + parity
157 gf_gen_cauchy1_matrix( encode_matrix.data(), static_cast<int>( objcfg.nbchunks ), static_cast<int>( objcfg.nbdata ) );
158}
159
160
161std::string RedundancyProvider::getErrorPattern( stripes_t &stripes ) const
162{
163 std::string pattern( objcfg.nbchunks, 0 );
164 for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
165 if( !stripes[i].valid ) pattern[i] = '\1';
166
167 return pattern;
168}
169
170
171RedundancyProvider::CodingTable& RedundancyProvider::getCodingTable( const std::string& pattern )
172{
173 std::lock_guard<std::mutex> lock(mutex);
174
175 /* If decode matrix is not already cached we have to construct it. */
176 if( !cache.count(pattern) )
177 {
178 /* Expand pattern */
179 int nerrs = 0, nsrcerrs = 0;
180 unsigned char err_indx_list[objcfg.nbparity];
181 /* Avoid narrowing cast warning, size is always < 256 */
182 uint8_t n = static_cast<uint8_t>(pattern.size() & 0xff);
183 for (uint8_t i = 0; i < n; i++) {
184 if (pattern[i]) {
185 if (nerrs == objcfg.nbparity)
186 throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, errno, "More errors than parity blocks" ) );
187 err_indx_list[nerrs++] = i;
188 if (i < objcfg.nbdata) { nsrcerrs++; }
189 }
190 }
191
192 /* Allocate Decode Object. */
193 CodingTable dd;
194 dd.nErrors = nerrs;
195 dd.blockIndices.resize( objcfg.nbdata );
196 dd.table.resize( objcfg.nbdata * objcfg.nbparity * 32);
197
198 /* Compute decode matrix. */
199 std::vector<unsigned char> decode_matrix(objcfg.nbchunks * objcfg.nbdata);
200
201 if (gf_gen_decode_matrix( encode_matrix.data(), decode_matrix.data(), dd.blockIndices.data(),
202 err_indx_list, (unsigned char*) pattern.c_str(), nerrs, nsrcerrs,
203 static_cast<int>( objcfg.nbdata ), static_cast<int>( objcfg.nbchunks ) ) )
204 throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, errno, "Failed computing decode matrix" ) );
205
206 /* Compute Tables. */
207 ec_init_tables( static_cast<int>( objcfg.nbdata ), nerrs, decode_matrix.data(), dd.table.data() );
208 cache.insert( std::make_pair(pattern, dd) );
209 }
210 return cache.at(pattern);
211}
212
213void RedundancyProvider::replication( stripes_t &stripes )
214{
215 // get index of a valid block
216 void *healthy = nullptr;
217 for( auto itr = stripes.begin(); itr != stripes.end(); ++itr )
218 {
219 if( itr->valid )
220 healthy = itr->buffer;
221 }
222
223 if( !healthy ) throw IOError( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ) );
224
225 // now replicate, by now 'buffers' should contain all chunks
226 for( uint8_t i = 0; i < objcfg.nbchunks; ++i )
227 {
228 if( !stripes[i].valid )
229 memcpy( stripes[i].buffer, healthy, objcfg.chunksize );
230 }
231}
232
234{
235 /* throws if stripe is not recoverable */
236 std::string pattern = getErrorPattern( stripes );
237
238 /* nothing to do if there are no parity blocks. */
239 if ( !objcfg.nbparity ) return;
240
241 /* in case of a single data block use replication */
242 if ( objcfg.nbdata == 1 )
243 return replication( stripes );
244
245 /* normal operation: erasure coding */
246 CodingTable& dd = getCodingTable(pattern);
247
248 unsigned char* inbuf[objcfg.nbdata];
249 for( uint8_t i = 0; i < objcfg.nbdata; i++ )
250 inbuf[i] = reinterpret_cast<unsigned char*>( stripes[dd.blockIndices[i]].buffer );
251
252 std::vector<unsigned char> memory( dd.nErrors * objcfg.chunksize );
253
254 unsigned char* outbuf[dd.nErrors];
255 for (int i = 0; i < dd.nErrors; i++)
256 {
257 outbuf[i] = &memory[i * objcfg.chunksize];
258 }
259
260 ec_encode_data(
261 static_cast<int>( objcfg.chunksize ), // Length of each block of data (vector) of source or destination data.
262 static_cast<int>( objcfg.nbdata ), // The number of vector sources in the generator matrix for coding.
263 dd.nErrors, // The number of output vectors to concurrently encode/decode.
264 dd.table.data(), // Pointer to array of input tables
265 inbuf, // Array of pointers to source input buffers
266 outbuf // Array of pointers to coded output buffers
267 );
268
269 int e = 0;
270 for (size_t i = 0; i < objcfg.nbchunks; i++)
271 {
272 if( pattern[i] )
273 {
274 memcpy( stripes[i].buffer, outbuf[e], objcfg.chunksize );
275 e++;
276 }
277 }
278}
279
280
281};
Class for computing parities and recovering data.
static std::string toString(Args &&...args)
RedundancyProvider(const ObjCfg &objcfg)
void compute(stripes_t &stripes)
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
std::vector< stripe_t > stripes_t
All stripes in a block.
static int gf_gen_decode_matrix(unsigned char *encode_matrix, unsigned char *decode_matrix, unsigned int *decode_index, unsigned char *src_err_list, unsigned char *src_in_err, unsigned int nerrs, unsigned int nsrcerrs, unsigned int k, unsigned int m)
const uint8_t nbdata
const uint8_t nbchunks
const uint8_t nbparity