gfcp_fec.go 7.57 KB
Newer Older
1
// Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
2
3
// Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
// Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
Jeffrey H. Johnson's avatar
Jeffrey H. Johnson committed
4
// Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
5
6
7
8
9
10
//
// All rights reserved.
//
// All use of this code is governed by the MIT license.
// The complete license is available in the LICENSE file.

11
package gfcp
12
13
14
15
16
17
18
19
20
21
22

import (
	"encoding/binary"
	"sync/atomic"

	"github.com/klauspost/reedsolomon"
)

const (
	fecHeaderSize      = 6
	fecHeaderSizePlus2 = fecHeaderSize + 2
23
24
25
26
	// KTypeData ...
	KTypeData = 0xf1
	// KTypeParity ...
	KTypeParity = 0xf2
27
28
)

29
// FecPacket ...
30
31
32
33
34
35
36
37
38
type FecPacket []byte

func (
	bts FecPacket,
) seqid() uint32 {
	return binary.LittleEndian.Uint32(
		bts,
	)
}
39

40
41
42
43
44
45
46
func (
	bts FecPacket,
) flag() uint16 {
	return binary.LittleEndian.Uint16(
		bts[4:],
	)
}
47

48
49
50
func (
	bts FecPacket,
) data() []byte {
51
52
53
	return bts[6:]
}

54
// FecDecoder ...
55
56
57
58
59
60
61
62
63
64
65
66
type FecDecoder struct {
	rxlimit      int
	dataShards   int
	parityShards int
	shardSize    int
	rx           []FecPacket
	DecodeCache  [][]byte
	flagCache    []bool
	zeros        []byte
	codec        reedsolomon.Encoder
}

Jeffrey H. Johnson's avatar
Jeffrey H. Johnson committed
67
68
// NewFECDecoder ...
func NewFECDecoder(
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
	rxlimit,
	dataShards,
	parityShards int,
) *FecDecoder {
	if dataShards <= 0 || parityShards <= 0 {
		return nil
	}
	if rxlimit < dataShards+parityShards {
		return nil
	}

	dec := new(
		FecDecoder,
	)
	dec.rxlimit = rxlimit
	dec.dataShards = dataShards
	dec.parityShards = parityShards
	dec.shardSize = dataShards + parityShards
	codec, err := reedsolomon.New(
		dataShards,
		parityShards,
	)
	if err != nil {
		return nil
	}
	dec.codec = codec
	dec.DecodeCache = make(
		[][]byte,
		dec.shardSize,
	)
	dec.flagCache = make(
		[]bool,
		dec.shardSize,
	)
	dec.zeros = make(
		[]byte,
105
		GFcpMtuLimit,
106
107
108
109
	)
	return dec
}

110
// Decode ...
111
112
113
114
115
116
117
118
119
120
func (
	dec *FecDecoder,
) Decode(
	in FecPacket,
) (
	recovered [][]byte,
) {
	n := len(
		dec.rx,
	) - 1
121
122
	insertIdx := 0
	for i := n; i >= 0; i-- {
123
		if in.seqid() == dec.rx[i].seqid() {
124
			return nil
125
126
127
128
		} else if _itimediff(
			in.seqid(),
			dec.rx[i].seqid(),
		) > 0 {
129
130
131
132
133
134
			insertIdx = i + 1
			break
		}
	}

	// make a copy
135
	pkt := FecPacket(KxmitBuf.Get().([]byte)[:len(in)])
136
137
138
139
	copy(
		pkt,
		in,
	)
140
141

	if insertIdx == n+1 {
142
143
144
145
		dec.rx = append(
			dec.rx,
			pkt,
		)
146
	} else {
147
148
149
150
151
152
153
154
		dec.rx = append(
			dec.rx,
			FecPacket{},
		)
		copy(
			dec.rx[insertIdx+1:],
			dec.rx[insertIdx:],
		)
155
156
157
158
159
160
161
162
163
164
165
		dec.rx[insertIdx] = pkt
	}

	shardBegin := pkt.seqid() - pkt.seqid()%uint32(dec.shardSize)
	shardEnd := shardBegin + uint32(dec.shardSize) - 1

	searchBegin := insertIdx - int(pkt.seqid()%uint32(dec.shardSize))
	if searchBegin < 0 {
		searchBegin = 0
	}
	searchEnd := searchBegin + dec.shardSize - 1
166
167
168
169
170
171
	if searchEnd >= len(
		dec.rx,
	) {
		searchEnd = len(
			dec.rx,
		) - 1
172
173
174
175
176
177
178
179
180
181
182
183
184
185
	}

	if searchEnd-searchBegin+1 >= dec.dataShards {
		var numshard, numDataShard, first, maxlen int

		shards := dec.DecodeCache
		shardsflag := dec.flagCache
		for k := range dec.DecodeCache {
			shards[k] = nil
			shardsflag[k] = false
		}

		for i := searchBegin; i <= searchEnd; i++ {
			seqid := dec.rx[i].seqid()
186
187
188
189
			if _itimediff(
				seqid,
				shardEnd,
			) > 0 {
190
				break
191
192
193
194
			} else if _itimediff(
				seqid,
				shardBegin,
			) >= 0 {
195
196
197
198
199
200
				shards[seqid%uint32(
					dec.shardSize,
				)] = dec.rx[i].data()
				shardsflag[seqid%uint32(
					dec.shardSize,
				)] = true
201
202
203
204
205
206
207
				numshard++
				if dec.rx[i].flag() == KTypeData {
					numDataShard++
				}
				if numshard == 1 {
					first = i
				}
208
209
210
211
212
213
				if len(
					dec.rx[i].data(),
				) > maxlen {
					maxlen = len(
						dec.rx[i].data(),
					)
214
215
216
217
218
				}
			}
		}

		if numDataShard == dec.dataShards {
219
220
221
222
223
			dec.rx = dec.freeRange(
				first,
				numshard,
				dec.rx,
			)
224
225
226
		} else if numshard >= dec.dataShards {
			for k := range shards {
				if shards[k] != nil {
227
228
229
					dlen := len(
						shards[k],
					)
230
231
232
233
234
235
					shards[k] = shards[k][:maxlen]
					copy(shards[k][dlen:], dec.zeros)
				} else {
					shards[k] = KxmitBuf.Get().([]byte)[:0]
				}
			}
236
237
238
			if err := dec.codec.ReconstructData(
				shards,
			); err == nil {
239
240
				for k := range shards[:dec.dataShards] {
					if !shardsflag[k] {
241
242
243
244
						recovered = append(
							recovered,
							shards[k],
						)
245
246
247
					}
				}
			}
248
249
250
251
252
			dec.rx = dec.freeRange(
				first,
				numshard,
				dec.rx,
			)
253
254
255
256
		}
	}

	if len(dec.rx) > dec.rxlimit {
257
		if dec.rx[0].flag() == KTypeData {
258
			atomic.AddUint64(
259
				&DefaultSnsi.GFcpFECRuntShards,
260
261
				1,
			)
262
		}
263
264
265
266
267
		dec.rx = dec.freeRange(
			0,
			1,
			dec.rx,
		)
268
269
270
271
	}
	return
}

272
273
274
275
276
277
278
func (
	dec *FecDecoder,
) freeRange(
	first,
	n int,
	q []FecPacket,
) []FecPacket {
279
	for i := first; i < first+n; i++ {
280
		// TODO(jhj): Switch to pointer to avoid allocation.
281
		KxmitBuf.Put(
282
283
284
			[]byte(
				q[i],
			),
285
		)
286
287
	}

288
	if first == 0 && n < (cap(q)/2) {
289
290
		return q[n:]
	}
291
292
293
294
	copy(
		q[first:],
		q[first+n:],
	)
295
296
297
	return q[:len(
		q,
	)-n]
298
299
300
}

type (
301
	// FecEncoder ...
302
	FecEncoder struct {
303
304
305
306
307
308
309
310
311
312
313
314
315
		dataShards    int
		parityShards  int
		shardSize     int
		paws          uint32 // Protect Against Wrapped Sequence numbers
		next          uint32 // next seqid
		shardCount    int    // count the number of datashards collected
		maxSize       int    // track maximum data length in datashard
		headerOffset  int    // FEC header offset
		payloadOffset int    // FEC payload offset
		shardCache    [][]byte
		EncodeCache   [][]byte
		zeros         []byte
		codec         reedsolomon.Encoder
316
317
318
	}
)

Jeffrey H. Johnson's avatar
Jeffrey H. Johnson committed
319
320
// NewFECEncoder ...
func NewFECEncoder(
321
322
323
324
	dataShards,
	parityShards,
	offset int,
) *FecEncoder {
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
	if dataShards <= 0 || parityShards <= 0 {
		return nil
	}
	enc := new(
		FecEncoder,
	)
	enc.dataShards = dataShards
	enc.parityShards = parityShards
	enc.shardSize = dataShards + parityShards
	enc.paws = (0xFFFFFFFF/uint32(enc.shardSize) - 1) * uint32(enc.shardSize)
	enc.headerOffset = offset
	enc.payloadOffset = enc.headerOffset + fecHeaderSize
	codec, err := reedsolomon.New(
		dataShards,
		parityShards,
	)
	if err != nil {
		return nil
	}
	enc.codec = codec
	enc.EncodeCache = make(
		[][]byte,
		enc.shardSize,
	)
	enc.shardCache = make(
		[][]byte,
		enc.shardSize,
	)
	for k := range enc.shardCache {
		enc.shardCache[k] = make(
			[]byte,
356
			GFcpMtuLimit,
357
358
359
360
		)
	}
	enc.zeros = make(
		[]byte,
361
		GFcpMtuLimit,
362
363
364
365
	)
	return enc
}

366
// Encode ...
367
368
369
370
371
372
373
374
375
376
func (
	enc *FecEncoder,
) Encode(
	b []byte,
) (
	ps [][]byte,
) {
	enc.markData(
		b[enc.headerOffset:],
	)
377
378
	binary.LittleEndian.PutUint16(
		b[enc.payloadOffset:],
379
380
381
382
		uint16(
			len(
				b[enc.payloadOffset:],
			),
383
384
		),
	)
385
386
387
388
	sz := len(
		b,
	)
	enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
389
390
391
392
	copy(
		enc.shardCache[enc.shardCount][enc.payloadOffset:],
		b[enc.payloadOffset:],
	)
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
	enc.shardCount++
	if sz > enc.maxSize {
		enc.maxSize = sz
	}
	if enc.shardCount == enc.dataShards {
		for i := 0; i < enc.dataShards; i++ {
			shard := enc.shardCache[i]
			slen := len(
				shard,
			)
			copy(
				shard[slen:enc.maxSize],
				enc.zeros,
			)
		}
		cache := enc.EncodeCache
		for k := range cache {
			cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
		}
		if err := enc.codec.Encode(
			cache,
		); err == nil {
			ps = enc.shardCache[enc.dataShards:]
			for k := range ps {
				enc.markParity(
					ps[k][enc.headerOffset:],
				)
				ps[k] = ps[k][:enc.maxSize]
			}
		}
		enc.shardCount = 0
		enc.maxSize = 0
	}
	return
}

429
430
431
432
433
434
435
436
437
438
439
440
441
func (
	enc *FecEncoder,
) markData(
	data []byte,
) {
	binary.LittleEndian.PutUint32(
		data,
		enc.next,
	)
	binary.LittleEndian.PutUint16(
		data[4:],
		KTypeData,
	)
442
443
444
	enc.next++
}

445
446
447
448
449
450
451
452
453
454
455
456
457
func (
	enc *FecEncoder,
) markParity(
	data []byte,
) {
	binary.LittleEndian.PutUint32(
		data,
		enc.next,
	)
	binary.LittleEndian.PutUint16(
		data[4:],
		KTypeParity,
	)
458
459
	enc.next = (enc.next + 1) % enc.paws
}