333 lines
11 KiB
Python
Raw Normal View History

2017-04-03 12:52:21 -04:00
# Authors: Veeresh Taranalli <veeresht@gmail.com>
# License: BSD 3-Clause
""" Turbo Codes """
from numpy import array, append, zeros, exp, pi, log, empty
from commpy.channelcoding import Trellis, conv_encode
from commpy.utilities import dec2bitarray, bitarray2dec
#from commpy.channelcoding.map_c import backward_recursion, forward_recursion_decoding
def turbo_encode(msg_bits, trellis1, trellis2, interleaver):
""" Turbo Encoder.
Encode Bits using a parallel concatenated rate-1/3
turbo code consisting of two rate-1/2 systematic
convolutional component codes.
Parameters
----------
msg_bits : 1D ndarray containing {0, 1}
Stream of bits to be turbo encoded.
trellis1 : Trellis object
Trellis representation of the
first code in the parallel concatenation.
trellis2 : Trellis object
Trellis representation of the
second code in the parallel concatenation.
interleaver : Interleaver object
Interleaver used in the turbo code.
Returns
-------
[sys_stream, non_sys_stream1, non_sys_stream2] : list of 1D ndarrays
Encoded bit streams corresponding
to the systematic output
and the two non-systematic
outputs from the two component codes.
"""
stream = conv_encode(msg_bits, trellis1, 'rsc')
sys_stream = stream[::2]
non_sys_stream_1 = stream[1::2]
interlv_msg_bits = interleaver.interlv(sys_stream)
puncture_matrix = array([[0, 1]])
non_sys_stream_2 = conv_encode(interlv_msg_bits, trellis2, 'rsc', puncture_matrix)
sys_stream = sys_stream[0:-trellis1.total_memory]
non_sys_stream_1 = non_sys_stream_1[0:-trellis1.total_memory]
non_sys_stream_2 = non_sys_stream_2[0:-trellis2.total_memory]
return [sys_stream, non_sys_stream_1, non_sys_stream_2]
def _compute_branch_prob(code_bit_0, code_bit_1, rx_symbol_0, rx_symbol_1,
noise_variance):
#cdef np.float64_t code_symbol_0, code_symbol_1, branch_prob, x, y
code_symbol_0 = 2*code_bit_0 - 1
code_symbol_1 = 2*code_bit_1 - 1
x = rx_symbol_0 - code_symbol_0
y = rx_symbol_1 - code_symbol_1
# Normalized branch transition probability
branch_prob = exp(-(x*x + y*y)/(2*noise_variance))
return branch_prob
def _backward_recursion(trellis, msg_length, noise_variance,
sys_symbols, non_sys_symbols, branch_probs,
priors, b_state_metrics):
n = trellis.n
number_states = trellis.number_states
number_inputs = trellis.number_inputs
codeword_array = empty(n, 'int')
next_state_table = trellis.next_state_table
output_table = trellis.output_table
# Backward recursion
for reverse_time_index in reversed(xrange(1, msg_length+1)):
for current_state in xrange(number_states):
for current_input in xrange(number_inputs):
next_state = next_state_table[current_state, current_input]
code_symbol = output_table[current_state, current_input]
codeword_array = dec2bitarray(code_symbol, n)
parity_bit = codeword_array[1]
msg_bit = codeword_array[0]
rx_symbol_0 = sys_symbols[reverse_time_index-1]
rx_symbol_1 = non_sys_symbols[reverse_time_index-1]
branch_prob = _compute_branch_prob(msg_bit, parity_bit,
rx_symbol_0, rx_symbol_1,
noise_variance)
branch_probs[current_input, current_state, reverse_time_index-1] = branch_prob
b_state_metrics[current_state, reverse_time_index-1] += \
(b_state_metrics[next_state, reverse_time_index] * branch_prob *
priors[current_input, reverse_time_index-1])
b_state_metrics[:,reverse_time_index-1] /= \
b_state_metrics[:,reverse_time_index-1].sum()
def _forward_recursion_decoding(trellis, mode, msg_length, noise_variance,
sys_symbols, non_sys_symbols, b_state_metrics,
f_state_metrics, branch_probs, app, L_int,
priors, L_ext, decoded_bits):
n = trellis.n
number_states = trellis.number_states
number_inputs = trellis.number_inputs
codeword_array = empty(n, 'int')
next_state_table = trellis.next_state_table
output_table = trellis.output_table
# Forward Recursion
for time_index in xrange(1, msg_length+1):
app[:] = 0
for current_state in xrange(number_states):
for current_input in xrange(number_inputs):
next_state = next_state_table[current_state, current_input]
branch_prob = branch_probs[current_input, current_state, time_index-1]
# Compute the forward state metrics
f_state_metrics[next_state, 1] += (f_state_metrics[current_state, 0] *
branch_prob *
priors[current_input, time_index-1])
# Compute APP
app[current_input] += (f_state_metrics[current_state, 0] *
branch_prob *
b_state_metrics[next_state, time_index])
lappr = L_int[time_index-1] + log(app[1]/app[0])
L_ext[time_index-1] = lappr
if mode == 'decode':
if lappr > 0:
decoded_bits[time_index-1] = 1
else:
decoded_bits[time_index-1] = 0
# Normalization of the forward state metrics
f_state_metrics[:,1] = f_state_metrics[:,1]/f_state_metrics[:,1].sum()
f_state_metrics[:,0] = f_state_metrics[:,1]
f_state_metrics[:,1] = 0.0
def map_decode(sys_symbols, non_sys_symbols, trellis, noise_variance, L_int, mode='decode'):
""" Maximum a-posteriori probability (MAP) decoder.
Decodes a stream of convolutionally encoded
(rate 1/2) bits using the MAP algorithm.
Parameters
----------
sys_symbols : 1D ndarray
Received symbols corresponding to
the systematic (first output) bits in
the codeword.
non_sys_symbols : 1D ndarray
Received symbols corresponding to the non-systematic
(second output) bits in the codeword.
trellis : Trellis object
Trellis representation of the convolutional code.
noise_variance : float
Variance (power) of the AWGN channel.
L_int : 1D ndarray
Array representing the initial intrinsic
information for all received
symbols.
Typically all zeros,
corresponding to equal prior
probabilities of bits 0 and 1.
mode : str{'decode', 'compute'}, optional
The mode in which the MAP decoder is used.
'decode' mode returns the decoded bits
along with the extrinsic information.
'compute' mode returns only the
extrinsic information.
Returns
-------
[L_ext, decoded_bits] : list of two 1D ndarrays
The first element of the list is the extrinsic information.
The second element of the list is the decoded bits.
"""
k = trellis.k
n = trellis.n
rate = float(k)/n
number_states = trellis.number_states
number_inputs = trellis.number_inputs
msg_length = len(sys_symbols)
# Initialize forward state metrics (alpha)
f_state_metrics = zeros([number_states, 2])
f_state_metrics[0][0] = 1
#print f_state_metrics
# Initialize backward state metrics (beta)
b_state_metrics = zeros([number_states, msg_length+1])
b_state_metrics[:,msg_length] = 1
# Initialize branch transition probabilities (gamma)
branch_probs = zeros([number_inputs, number_states, msg_length+1])
app = zeros(number_inputs)
lappr = 0
decoded_bits = zeros(msg_length, 'int')
L_ext = zeros(msg_length)
priors = empty([2, msg_length])
priors[0,:] = 1/(1 + exp(L_int))
priors[1,:] = 1 - priors[0,:]
# Backward recursion
_backward_recursion(trellis, msg_length, noise_variance, sys_symbols,
non_sys_symbols, branch_probs, priors, b_state_metrics)
# Forward recursion
_forward_recursion_decoding(trellis, mode, msg_length, noise_variance, sys_symbols,
non_sys_symbols, b_state_metrics, f_state_metrics,
branch_probs, app, L_int, priors, L_ext, decoded_bits)
return [L_ext, decoded_bits]
def turbo_decode(sys_symbols, non_sys_symbols_1, non_sys_symbols_2, trellis,
noise_variance, number_iterations, interleaver, L_int = None):
""" Turbo Decoder.
Decodes a stream of convolutionally encoded
(rate 1/3) bits using the BCJR algorithm.
Parameters
----------
sys_symbols : 1D ndarray
Received symbols corresponding to
the systematic (first output) bits in the codeword.
non_sys_symbols_1 : 1D ndarray
Received symbols corresponding to
the first parity bits in the codeword.
non_sys_symbols_2 : 1D ndarray
Received symbols corresponding to the
second parity bits in the codeword.
trellis : Trellis object
Trellis representation of the convolutional codes
used in the Turbo code.
noise_variance : float
Variance (power) of the AWGN channel.
number_iterations : int
Number of the iterations of the
BCJR algorithm used in turbo decoding.
interleaver : Interleaver object.
Interleaver used in the turbo code.
L_int : 1D ndarray
Array representing the initial intrinsic
information for all received
symbols.
Typically all zeros,
corresponding to equal prior
probabilities of bits 0 and 1.
Returns
-------
decoded_bits : 1D ndarray of ints containing {0, 1}
Decoded bit stream.
"""
if L_int is None:
L_int = zeros(len(sys_symbols))
L_int_1 = L_int
# Interleave systematic symbols for input to second decoder
sys_symbols_i = interleaver.interlv(sys_symbols)
for iteration_count in xrange(number_iterations):
# MAP Decoder - 1
[L_ext_1, decoded_bits] = map_decode(sys_symbols, non_sys_symbols_1,
trellis, noise_variance, L_int_1, 'compute')
L_ext_1 = L_ext_1 - L_int_1
L_int_2 = interleaver.interlv(L_ext_1)
if iteration_count == number_iterations - 1:
mode = 'decode'
else:
mode = 'compute'
# MAP Decoder - 2
[L_2, decoded_bits] = map_decode(sys_symbols_i, non_sys_symbols_2,
trellis, noise_variance, L_int_2, mode)
L_ext_2 = L_2 - L_int_2
L_int_1 = interleaver.deinterlv(L_ext_2)
decoded_bits = interleaver.deinterlv(decoded_bits)
return decoded_bits