In this post, I will demonstrate how to use ARM’s LDREX/STREX instructions to
write re-entrant thread-safe code without relying on mutexes or disabling
interrupts. Along the way, I will write a simple block memory allocator with a
run-time adjustable block size and unit-test it under QEMU using ARM
semihosting. I will also briefly touch on GCC inline assembly.
Source code is available as ldrex-malloc.tar.gz
… but this last comforting thought was denied to me in 1957 with the introduction of the real-time interrupt. When Loopstra and Scholten suggested this feature for the X1, our next machine, I got visions of my program causing irreproducible errors and I panicked.
– E. W. Dijkstra “What led to ‘Notes on Structured Programming’”
Simple things become hard when other threads or higher-priority IRQs can interrupt our code at any instruction. This classic example is increment:
int i;
i = i + 1;
This code is not thread-safe even though int is a naturally atomic type.
It basically means that it fits into a register and can be loaded or stored
to memory in one instruction. You will never see half of your bytes in int
loaded, unlike e.g., types like int64_t which need two instructions to load on
a 32-bit processor.
The problem with this code becomes obvious when you look at the assembly, this compiles to:
ldr r0, [i]
add r0, #1
str r0, [i]
As you can see, this operation is not atomic, that is, it can be interrupted in
the middle. And that is a problem because add and str will act on outdated
data, and as a result overwrite the result of another operation:
; i = 0
ldr r0, [i] ; r0 = 0
; A higher priority interrupt happens here
ldr r1, [i] ; r1 = 0
add r1, #1 ; r1 = 1
str r1, [i] ; i = 1
; Continuing after the interrupt
; i = 1
add r0, #1 ; r0 = 1
str r0, [i] ; i = 1 !
The simplest way to make this operation atomic is by masking interrupts to create a critical section:
mrs r2, primask ; save old mask
mov r3, #1
msr primask, r3 ; mask interrupts
; Critical section
ldr r0, [i]
add r0, #1
str r0, [i]
msr primask, r2 ; unmask interrupts
Note how the old priority mask is saved so that we do not re-enable interrupts early if called from another critical section.
There are two problems with this approach.
First, it increases latency for higher priority interrupts, which might be fine if done rarely, but will break your real time guarantees if used without care.
The second problem is that it only works for single-core systems – other cores may access the same memory concurrently even when interrupts are masked locally.
To overcome those limitations, ARMv7 introduced the Exclusive Monitor and a
pair of generic synchronization primitives – LDREX/STREX [1]. They allow us
to make even complex operations on memory effectively atomic. This mechanism
also supports multi-CPU systems with a Global Exclusive Monitor.
The Exclusive Monitor can be in one of two states: Open Access or Exclusive
Access.
LDREX behaves like a normal LDR, except it transitions the Monitor to
Exclusive Access after the load.
STREX is akin to STR but conditional: if the Monitor was set to Exclusive
Access by LDREX, it will do the store, transition the Monitor back to Open
Access and return a status value of 0; otherwise, it does nothing and
returns a 1.
We can now rewrite our increment as:
repeat:
ldrex r0, [i]
add r0, #1
strex r4, r0, [i]
cmp r4, #0
bne repeat
Note how we must repeat the load and the computation if the store fails. This
ensures that we never store the result of a computation on the old value. In
other words, STREX here checks that the value we want to store is computed
from the value that is currently at that memory address.
And we are not limited to a single instruction in-between:
mov r5, 10
repeat:
ldrex r0, [i]
tst r0, #1
bne even
add r0, #1
even:
mul r0, r5, r0
sub r0, #1
strex r4, r0, [i]
cmp r4, #0
bne repeat
However, it is best to keep the computation as short as possible to lower the
chances of it being interrupted. You should also avoid any memory stores
in-between LDREX/STREX as that may cause a livelock on some implementations -
it is IMPLEMENTATION DEFINED whether a simple store will reset the Monitor or
not.
Perhaps the most important restriction of these instructions is that you cannot
have more than one outstanding exclusive access for each execution thread. Each
STREX must be preceded by an LDREX from the same address. For example, the
following code is malformed, and a lock must be used instead:
repeat:
ldrex r0, [a]
ldrex r6, [b]
add r0, #1
add r6, r0
strex r4, r0, [a] ; invalid: address not equal to a preceding `ldrex`
cmp r4, #0
bne repeat
strex r4, r6, [b] ; invalid: not preceded by `ldrex`
cmp r4, #0
bne repeat
Now let’s see what happens if our new increment is interrupted:
; i = 0
repeat:
ldrex r0, [i] ; r0 = 0
; A higher priority interrupt happens here
repeat2:
ldrex r7, [i] ; r7 = 0
add r7, #1 ; r7 = 1
strex r4, r7, [i] ; success, i = 1
cmp r4, #0
bne repeat2 ; not taken
; Continuing after the interrupt
; i = 1
add r0, #1 ; r0 = 1
strex r4, r0, [i] ; fail: the Monitor was reset in the interrupt
cmp r4, #0
bne repeat ; taken
; On the second iteration
; i = 1
repeat:
ldrex r0, [i] ; r0 = 1
add r0, #1 ; r0 = 2
strex r4, r0, [i] ; success, i = 2
cmp r4, #0
bne repeat ; not taken
; i = 2
You can see how the first thread had to repeat the increment with the actual data which resolved the race condition.
The Monitor might be reset unconditionally on interrupts, so LDREX might fail
even if no other thread used the primitives.
Now, let’s apply those synchronization primitives in a real program by creating
a lock-free [2] memory allocator which can allocate or free memory chunks of
fixed size from a static pool in O(1) time. For that I will place a control
structure with an array of free blocks at the start of the memory pool. As an
added bonus, I will make the block size adjustable in run-time during
initialization of the allocator.
__attribute__((packed)) struct bmalloc_ctx {
uint32_t free_len;
uint32_t free_mlen;
uint32_t blen;
uint32_t store_mlen;
void * free[];
/* unsigned char store[] here */
};
Note the syntax void * free[]: this is a flexible array member. It does
not take any additional space and allows us to say that a structure is
immediately followed by an array of run-time size. It is basically a shortcut
to write ctx->free[0] instead of ((char *) ctx + sizeof(*ctx)). The
__attribute__((packed)) prevents compiler from inserting any unexpected
alignment or padding because we will ensure alignment manually.
#define BMALLOC_ALIGNMENT __alignof__(void *)
#define BMALLOC_ALIGN_MASK (BMALLOC_ALIGNMENT - 1)
struct bmalloc_ctx * bmalloc_init(
unsigned char * store, uint32_t store_mlen, uint32_t blen)
{
unsigned int i;
unsigned char * free_space;
struct bmalloc_ctx * c;
/* space lost due to alignment */
store_mlen -= ((intptr_t) store) & BMALLOC_ALIGN_MASK;
store += -((intptr_t) store) & BMALLOC_ALIGN_MASK; /* fix alignment */
c = (struct bmalloc_ctx *) store;
c->store_mlen = store_mlen;
blen += -blen & BMALLOC_ALIGN_MASK; /* make it a multiple of alignment */
c->blen = blen;
/* ... continued below ... */
}
__alignof__ is a GCC keyword, which computes an alignment in bytes.
BMALLOC_ALIGNMENT will be 4 on 32-bit cores and 8 on 64-bit ones. We then
proceed to align the store pointer at the maximum natural alignment for the
target core. (intptr_t) store lets us portably work with a pointer value as
an integer; We then negate the address to compute the distance from the
alignment boundary and mask the result to remove extra bits:
BMALLOC_ALIGN_MASK = 0b000000000000011 = 0x3
store = 0b010000000000010 = 0x2002
-store = 0b101111111111110
-store & 3 = 0b000000000000010 = 2
store + 2 = 0b010000000000100 = 0x2004
This is necessary because LDREX/STREX, as well as other instructions do not
allow unaligned access. We do that both for our control structure and to ensure
that allocated blocks can be used in arbitrary contexts – hence the need to
align blen.
We then compute the maximum number of blocks:
c->free_mlen = (store_mlen - sizeof(*c)) / (blen + sizeof(c->free[0]));
store_mlen - sizeof(*c) accounts for the overhead from the fixed part of the
control struct, while sizeof(c->free[0]) is added because there has to be a
pointer for it in c->free. Integer division always rounds down to discard an
extra space that is too small for a block.
After that we know the real size of our control structure and where it ends:
free_space = store + sizeof(*c) + c->free_mlen * sizeof(c->free[0]);
Note how this pointer is always aligned to BMALLOC_ALIGNMENT because store
is aligned, sizeof(*c) is 8 which is the alignment for 64-bit targets, and
sizeof(c->free[0]) is the size of a pointer.
All that’s left is to fill the array:
for (i = 0; i < c->free_mlen; i++) {
c->free[i] = free_space + blen * i;
}
c->free_len = c->free_mlen;
return c;
Now onto allocation, which is quite simple after all those preparations:
uint32_t atomic_fetch_sub_unless(uint32_t * v, int b, int unless);
uint32_t atomic_fetch_add_unless(uint32_t * v, int b, int unless);
void * bmalloc(struct bmalloc_ctx * c)
{
uint32_t free_len = atomic_fetch_sub_unless(&c->free_len, 1, 0);
if (free_len > 0) {
return c->free[free_len - 1];
} else {
return NULL;
}
}
void bfree(struct bmalloc_ctx * c, void * ptr)
{
uint32_t free_len = atomic_fetch_add_unless(&c->free_len, 1, c->free_mlen);
BMALLOC_ASSERT(free_len < c->free_mlen); /* likely due to a double free */
c->free[free_len] = ptr;
}
Note how I keep using free_len fetched during the atomic operation. That is
to keep the entire function atomic and prevent time-of-check / time-of-use
races: this returned value cannot be changed by other threads, which will in
turn get a different value because the atomic operation decremented it.
atomic_fetch_sub_unless is a primitive that atomically loads a value from
memory, subtracts b if the value is not equal to unless, and then returns
the original value from memory. This way the returned value is always the one
that was used in computation. atomic_fetch_add_unless is the same but it does
addition instead of subtraction.
@ uint32_t atomic_fetch_sub_unless(uint32_t * v, int b, int unless);
.syntax unified
.thumb
.global atomic_fetch_sub_unless
.align 1
.thumb_func
atomic_fetch_sub_unless:
repeat:
ldrex r3, [r0] @ old = *v
cmp r3, r2
beq exit @ if (old == unless) exit
subs r2, r3, r1 @ new (r2) = old - b
strex r1, r2, [r0] @ *v = new, r1 = ok
cmp r1, #0
bne repeat @ if (! ok) repeat
exit:
movs r0, r3 @ return old
bx lr
The directives on top are here to make the GNU Assembler produce a symbol with
a low bit set in the address, so that it can be called from Thumb state. Also
note that this function does not require a stack frame because it only uses
scratch registers r0-r3, which the compiler expects to change after the
function call [4].
We can also rewrite this using GCC Inline assembly to allow the compiler to e.g. inline the function [5]:
uint32_t atomic_fetch_add_unless(uint32_t * v, int b, int unless)
{
int old;
int new;
int ok;
__asm__ volatile ("@ atomic_fetch_add_unless\n\t"
"1:\n\t"
"ldrex %[old], [%[v]]\n\t"
"cmp %[old], %[u]\n\t"
"beq 2f\n\t"
"add %[new], %[old], %[b]\n\t"
"strex %[ok], %[new], [%[v]]\n\t"
"cmp %[ok], #0\n\t"
"bne 1b\n"
"2:"
: [v] "+r" (v), [old] "=r" (old), [new] "=r" (new), [ok] "=r" (ok)
: [b] "r" (b), [u] "r" (unless)
: "cc"
);
return old;
}
This function uses Extended Asm, which comes in a form
__asm__ [volatile] (
"assembly"
: OutputOperands
[: InputOperands]
[: Clobbers]
)
OutputOperands is a list of C variables modified or read by this statement,
InputOperands is for the variables that are only read, and Clobbers is a
list of registers changed beyond the input or output operands.
The operands are specified as
[alias] "<constraints>" (C expression)
We can refer to the operands from the assembly by their index as %0 or by the
optional symbolic name: %[alias]. Constraints for output operands must be
prefixed by either = or + to specify if they only overwrite the value or if
they both read and write to it respectively. There are many possible constraint
symbols but the most common is r, which places the value of the
C expression to a register allocated by the compiler. The cc in Clobbers
tells the compiler that we modify the condition flags. Refer to the docs for
more. Also, note the explicit \n\t here, which are necessary because after
expanding the operands, GCC will pass the assembly string to the assembler as
is.
Local testing with QEMU is an easy way to see if your logic is sound and your assembly is correct without building complex hardware harnesses. It is, however, very limited in terms of timing accuracy, peripherals and CPU quirks. QEMU is not a cycle-perfect simulator and is not meant for catching subtle concurrency bugs. For that you’ll have to get real hardware.
Let’s start our testing with some helper functions:
void assert_handler(const char * file, int line, const char * stmt);
#ifndef DEBUG_ASSERT
#define DEBUG_ASSERT(stmt) do { \
if (! __builtin_expect(!! (stmt), 1)) { \
assert_handler(__FILE__, __LINE__, #stmt); \
while (1) \
; \
} \
} while (0)
#endif
The __builtin_expect here is to aid compiler optimization – we expect the
boolean value of stmt to be true most of the time. The !! (stmt) is a
simple trick to convert any numeric value to a boolean 0/1.
Next, let’s define functions that will allow us to send information outside the simulation with the help of ARM Semihosting [6]:
#define SEMIHOSTING_EXIT_CODE_INTERNAL_ERROR 0x20024 /* exit code 1 */
#define SEMIHOSTING_EXIT_CODE_APPLICATION_EXIT 0x20026 /* exit code 0 */
void semihosting_write0(const char * str)
{
__asm__ volatile (
"mov r0, #0x04\n\t"
"mov r1, %0\n\t"
"bkpt 0xAB"
: /* no outputs */
: "r" (str) /* input operands */
: "r0", "r1" /* Clobbers */
);
}
__attribute__((noreturn)) void semihosting_exit(int code)
{
__asm__ volatile (
"mov r0, #0x18\n\t"
"mov r1, %0\n\t"
"bkpt 0xAB"
:
: "r" (code)
: "r0", "r1"
);
__builtin_unreachable();
}
The bkpt 0xAB instruction is the way to trigger a Semihosting call on
Cortex-M. Other cores will need a different instruction. The semihosting host
then catches this exception and retrieves the opcode and parameters from the
registers, just like a system call. semihosting_write0 will write a
null-terminated string to stdout, and semihosting_exit will quit the QEMU
process with an exit code of 0 or 1.
We can then nicely report the reason for test failure as:
/* left as an exercise to the reader */
const char * int_to_str_(int i, char * buf, int buf_mlen);
void assert_handler(const char * file, int line, const char * stmt)
{
char line_str[12];
semihosting_write0("Assertion failed: ");
semihosting_write0(file);
semihosting_write0(":");
semihosting_write0(int_to_str_(line, line_str, sizeof(line_str)));
semihosting_write0(": ");
semihosting_write0(stmt);
semihosting_write0("\n");
semihosting_exit(SEMIHOSTING_EXIT_CODE_INTERNAL_ERROR);
}
The last thing we’ll need is a vector table to make our code runnable:
.syntax unified
.thumb
.global .vector
.vector:
.word _stack @ initial SP value, provided by the linker script
.word isr_reset
.word isr_nmi
.word isr_hard_fault
.word isr_mem_manage
.word isr_bus_fault
.word isr_usage_fault
.skip 16 @ reserved
.word isr_sv_call
.word isr_debug_monitor
.skip 4 @ reserved
.word isr_pend_sv
.word isr_systick
.skip 344
… and a basic linker script:
MEMORY {
flash (rx) : ORIGIN = 0x08000000, LENGTH = 512K
sram (w!x) : ORIGIN = 0x20000000, LENGTH = 128K
}
SECTIONS {
.text : {
KEEP(*(.vector))
*(.text)
} > flash
.data : {
*(.data*)
} > sram AT > flash
}
ENTRY(isr_reset);
EXTERN(.vector);
PROVIDE(_stack = ORIGIN(sram) + LENGTH(sram));
Now we can write our tests as:
static unsigned char store[10000];
void test_one_block(void)
{
unsigned int store_mlen = BMALLOC_STORE_MLEN_FOR_BLOCKS(1, 1);
struct bmalloc_ctx * b;
unsigned char * ptr1;
unsigned char * ptr2;
b = bmalloc_init(store, store_mlen, 1);
ptr1 = bmalloc(b);
DEBUG_ASSERT(ptr1 != NULL);
DEBUG_ASSERT(ptr1 >= store && ptr1 < store + store_mlen);
bfree(b, ptr1);
ptr1 = bmalloc(b);
ptr2 = bmalloc(b);
DEBUG_ASSERT(ptr1 != NULL && ptr2 == NULL);
bfree(b, ptr1);
}
void isr_reset(void)
{
semihosting_write0("Test started\n");
test_one_block();
semihosting_write0("Test passed\n");
semihosting_exit(SEMIHOSTING_EXIT_CODE_APPLICATION_EXIT);
}
void isr_hard_fault(void)
{
semihosting_write0("HardFault\n");
semihosting_exit(SEMIHOSTING_EXIT_CODE_INTERNAL_ERROR);
}
Now running it is as simple as:
# qemu-system-arm -M netduinoplus2 -s -nographic -semihosting \
-kernel test_bmalloc.elf
Test started
Test passed
-M netduinoplus2 selects a machine with an STM32F405RGT6 featuring ARM
Cortex-M4. -s starts a gdb server at localhost:1234. -semihosting, as the
name implies, enables support for the ARM Semihosting instructions. You may
also pass the -S flag to halt the simulation after start and connect with a
debugger.
Now, let’s add a double free to our code and see how it looks:
# qemu-system-arm -M netduinoplus2 -s -nographic -semihosting \
-kernel test_bmalloc.elf
Test started
Assertion failed: bmalloc.c:81: free_len < c->free_mlen
# echo $?
1
You can also use the same semihosting approach on a real controller, but that’s a whole other story.
And that is all I have for you today. For further reading, please check out an awesome blog by Dmitry Vyukov about lock-free algorithms and beyond [2]. If you have any practical guides on lock-free implementations on ARM specifically, please do share.