<< Home

Andrei Liungrin


Lock-free programming with LDREX/STREX on ARMv7: A block memory allocator

In this post, I will demonstrate how to use ARM’s LDREX/STREX instructions to write re-entrant thread-safe code without relying on mutexes or disabling interrupts. Along the way, I will write a simple block memory allocator with a run-time adjustable block size and unit-test it under QEMU using ARM semihosting. I will also briefly touch on GCC inline assembly.

Source code is available as ldrex-malloc.tar.gz

The problem

… but this last comforting thought was denied to me in 1957 with the introduction of the real-time interrupt. When Loopstra and Scholten suggested this feature for the X1, our next machine, I got visions of my program causing irreproducible errors and I panicked.

E. W. Dijkstra “What led to ‘Notes on Structured Programming’”

Simple things become hard when other threads or higher-priority IRQs can interrupt our code at any instruction. This classic example is increment:

int i;
i = i + 1;

This code is not thread-safe even though int is a naturally atomic type. It basically means that it fits into a register and can be loaded or stored to memory in one instruction. You will never see half of your bytes in int loaded, unlike e.g., types like int64_t which need two instructions to load on a 32-bit processor.

The problem with this code becomes obvious when you look at the assembly, this compiles to:

ldr r0, [i]
add r0, #1
str r0, [i]

As you can see, this operation is not atomic, that is, it can be interrupted in the middle. And that is a problem because add and str will act on outdated data, and as a result overwrite the result of another operation:

; i = 0
ldr r0, [i]     ; r0 = 0

; A higher priority interrupt happens here
    ldr r1, [i] ; r1 = 0
    add r1, #1  ; r1 = 1
    str r1, [i] ; i = 1
; Continuing after the interrupt
; i = 1

add r0, #1      ; r0 = 1
str r0, [i]     ; i = 1 !

The simplest way to make this operation atomic is by masking interrupts to create a critical section:

mrs r2, primask ; save old mask
mov r3, #1
msr primask, r3 ; mask interrupts

; Critical section
ldr r0, [i]
add r0, #1
str r0, [i]

msr primask, r2 ; unmask interrupts

Note how the old priority mask is saved so that we do not re-enable interrupts early if called from another critical section.

There are two problems with this approach.

First, it increases latency for higher priority interrupts, which might be fine if done rarely, but will break your real time guarantees if used without care.

The second problem is that it only works for single-core systems – other cores may access the same memory concurrently even when interrupts are masked locally.

Synchronization primitives

To overcome those limitations, ARMv7 introduced the Exclusive Monitor and a pair of generic synchronization primitives – LDREX/STREX [1]. They allow us to make even complex operations on memory effectively atomic. This mechanism also supports multi-CPU systems with a Global Exclusive Monitor.

The Exclusive Monitor can be in one of two states: Open Access or Exclusive Access.

LDREX behaves like a normal LDR, except it transitions the Monitor to Exclusive Access after the load.

STREX is akin to STR but conditional: if the Monitor was set to Exclusive Access by LDREX, it will do the store, transition the Monitor back to Open Access and return a status value of 0; otherwise, it does nothing and returns a 1.

We can now rewrite our increment as:

repeat:
ldrex r0, [i]
add r0, #1
strex r4, r0, [i]
cmp r4, #0
bne repeat

Note how we must repeat the load and the computation if the store fails. This ensures that we never store the result of a computation on the old value. In other words, STREX here checks that the value we want to store is computed from the value that is currently at that memory address.

And we are not limited to a single instruction in-between:

mov r5, 10
repeat:
ldrex r0, [i]

tst r0, #1
bne even
add r0, #1
even:
mul r0, r5, r0
sub r0, #1

strex r4, r0, [i]
cmp r4, #0
bne repeat

However, it is best to keep the computation as short as possible to lower the chances of it being interrupted. You should also avoid any memory stores in-between LDREX/STREX as that may cause a livelock on some implementations - it is IMPLEMENTATION DEFINED whether a simple store will reset the Monitor or not.

Perhaps the most important restriction of these instructions is that you cannot have more than one outstanding exclusive access for each execution thread. Each STREX must be preceded by an LDREX from the same address. For example, the following code is malformed, and a lock must be used instead:

repeat:
ldrex r0, [a]
ldrex r6, [b]
add r0, #1
add r6, r0
strex r4, r0, [a] ; invalid: address not equal to a preceding `ldrex`
cmp r4, #0
bne repeat
strex r4, r6, [b] ; invalid: not preceded by `ldrex`
cmp r4, #0
bne repeat

Now let’s see what happens if our new increment is interrupted:

; i = 0
repeat:
ldrex r0, [i]           ; r0 = 0

; A higher priority interrupt happens here
    repeat2:
    ldrex r7, [i]       ; r7 = 0
    add r7, #1          ; r7 = 1
    strex r4, r7, [i]   ; success, i = 1
    cmp r4, #0
    bne repeat2         ; not taken
; Continuing after the interrupt
; i = 1

add r0, #1              ; r0 = 1
strex r4, r0, [i]       ; fail: the Monitor was reset in the interrupt
cmp r4, #0
bne repeat              ; taken

; On the second iteration
; i = 1
repeat:
ldrex r0, [i]           ; r0 = 1
add r0, #1              ; r0 = 2
strex r4, r0, [i]       ; success, i = 2
cmp r4, #0
bne repeat              ; not taken
; i = 2

You can see how the first thread had to repeat the increment with the actual data which resolved the race condition.

The Monitor might be reset unconditionally on interrupts, so LDREX might fail even if no other thread used the primitives.

Practice: A block allocator

Now, let’s apply those synchronization primitives in a real program by creating a lock-free [2] memory allocator which can allocate or free memory chunks of fixed size from a static pool in O(1) time. For that I will place a control structure with an array of free blocks at the start of the memory pool. As an added bonus, I will make the block size adjustable in run-time during initialization of the allocator.

__attribute__((packed)) struct bmalloc_ctx {
    uint32_t free_len;
    uint32_t free_mlen;
    uint32_t blen;
    uint32_t store_mlen;
    void * free[];
    /* unsigned char store[] here */
};

Note the syntax void * free[]: this is a flexible array member. It does not take any additional space and allows us to say that a structure is immediately followed by an array of run-time size. It is basically a shortcut to write ctx->free[0] instead of ((char *) ctx + sizeof(*ctx)). The __attribute__((packed)) prevents compiler from inserting any unexpected alignment or padding because we will ensure alignment manually.

#define BMALLOC_ALIGNMENT __alignof__(void *)
#define BMALLOC_ALIGN_MASK (BMALLOC_ALIGNMENT - 1)

struct bmalloc_ctx * bmalloc_init(
    unsigned char * store, uint32_t store_mlen, uint32_t blen)
{
    unsigned int i;
    unsigned char * free_space;
    struct bmalloc_ctx * c;

    /* space lost due to alignment */
    store_mlen -= ((intptr_t) store) & BMALLOC_ALIGN_MASK;
    store += -((intptr_t) store) & BMALLOC_ALIGN_MASK; /* fix alignment */

    c = (struct bmalloc_ctx *) store;
    c->store_mlen = store_mlen;

    blen += -blen & BMALLOC_ALIGN_MASK; /* make it a multiple of alignment */
    c->blen = blen;

    /* ... continued below ... */
}

__alignof__ is a GCC keyword, which computes an alignment in bytes. BMALLOC_ALIGNMENT will be 4 on 32-bit cores and 8 on 64-bit ones. We then proceed to align the store pointer at the maximum natural alignment for the target core. (intptr_t) store lets us portably work with a pointer value as an integer; We then negate the address to compute the distance from the alignment boundary and mask the result to remove extra bits:

BMALLOC_ALIGN_MASK = 0b000000000000011 = 0x3
store              = 0b010000000000010 = 0x2002
-store             = 0b101111111111110
-store & 3         = 0b000000000000010 = 2
store + 2          = 0b010000000000100 = 0x2004

This is necessary because LDREX/STREX, as well as other instructions do not allow unaligned access. We do that both for our control structure and to ensure that allocated blocks can be used in arbitrary contexts – hence the need to align blen.

We then compute the maximum number of blocks:

c->free_mlen = (store_mlen - sizeof(*c)) / (blen + sizeof(c->free[0]));

store_mlen - sizeof(*c) accounts for the overhead from the fixed part of the control struct, while sizeof(c->free[0]) is added because there has to be a pointer for it in c->free. Integer division always rounds down to discard an extra space that is too small for a block.

After that we know the real size of our control structure and where it ends:

free_space = store + sizeof(*c) + c->free_mlen * sizeof(c->free[0]);

Note how this pointer is always aligned to BMALLOC_ALIGNMENT because store is aligned, sizeof(*c) is 8 which is the alignment for 64-bit targets, and sizeof(c->free[0]) is the size of a pointer.

All that’s left is to fill the array:

for (i = 0; i < c->free_mlen; i++) {
    c->free[i] = free_space + blen * i;
}
c->free_len = c->free_mlen;
return c;

Now onto allocation, which is quite simple after all those preparations:

uint32_t atomic_fetch_sub_unless(uint32_t * v, int b, int unless);
uint32_t atomic_fetch_add_unless(uint32_t * v, int b, int unless);

void * bmalloc(struct bmalloc_ctx * c)
{
    uint32_t free_len = atomic_fetch_sub_unless(&c->free_len, 1, 0);
    if (free_len > 0) {
        return c->free[free_len - 1];
    } else {
        return NULL;
    }
}

void bfree(struct bmalloc_ctx * c, void * ptr)
{
    uint32_t free_len = atomic_fetch_add_unless(&c->free_len, 1, c->free_mlen);
    BMALLOC_ASSERT(free_len < c->free_mlen); /* likely due to a double free */
    c->free[free_len] = ptr;
}

Note how I keep using free_len fetched during the atomic operation. That is to keep the entire function atomic and prevent time-of-check / time-of-use races: this returned value cannot be changed by other threads, which will in turn get a different value because the atomic operation decremented it.

atomic_fetch_sub_unless is a primitive that atomically loads a value from memory, subtracts b if the value is not equal to unless, and then returns the original value from memory. This way the returned value is always the one that was used in computation. atomic_fetch_add_unless is the same but it does addition instead of subtraction.

@ uint32_t atomic_fetch_sub_unless(uint32_t * v, int b, int unless);
.syntax unified
.thumb
.global atomic_fetch_sub_unless
.align 1
.thumb_func
atomic_fetch_sub_unless:
repeat:
    ldrex   r3, [r0]        @ old = *v
    cmp     r3, r2
    beq     exit            @ if (old == unless) exit
    subs    r2, r3, r1      @ new (r2) = old - b
    strex   r1, r2, [r0]    @ *v = new, r1 = ok
    cmp     r1, #0
    bne     repeat          @ if (! ok) repeat
exit:
    movs    r0, r3          @ return old
    bx lr

The directives on top are here to make the GNU Assembler produce a symbol with a low bit set in the address, so that it can be called from Thumb state. Also note that this function does not require a stack frame because it only uses scratch registers r0-r3, which the compiler expects to change after the function call [4].

We can also rewrite this using GCC Inline assembly to allow the compiler to e.g. inline the function [5]:

uint32_t atomic_fetch_add_unless(uint32_t * v, int b, int unless)
{
    int old;
    int new;
    int ok;
    __asm__ volatile ("@ atomic_fetch_add_unless\n\t"
"1:\n\t"
"ldrex %[old], [%[v]]\n\t"
"cmp %[old], %[u]\n\t"
"beq 2f\n\t"
"add %[new], %[old], %[b]\n\t"
"strex %[ok], %[new], [%[v]]\n\t"
"cmp %[ok], #0\n\t"
"bne 1b\n"
"2:"
    : [v] "+r" (v), [old] "=r" (old), [new] "=r" (new), [ok] "=r" (ok)
    : [b] "r" (b), [u] "r" (unless)
    : "cc"
    );
    return old;
}

This function uses Extended Asm, which comes in a form

__asm__ [volatile] (
    "assembly"
    : OutputOperands
    [: InputOperands]
    [: Clobbers]
)

OutputOperands is a list of C variables modified or read by this statement, InputOperands is for the variables that are only read, and Clobbers is a list of registers changed beyond the input or output operands.

The operands are specified as

[alias] "<constraints>" (C expression)

We can refer to the operands from the assembly by their index as %0 or by the optional symbolic name: %[alias]. Constraints for output operands must be prefixed by either = or + to specify if they only overwrite the value or if they both read and write to it respectively. There are many possible constraint symbols but the most common is r, which places the value of the C expression to a register allocated by the compiler. The cc in Clobbers tells the compiler that we modify the condition flags. Refer to the docs for more. Also, note the explicit \n\t here, which are necessary because after expanding the operands, GCC will pass the assembly string to the assembler as is.

Testing with QEMU

Local testing with QEMU is an easy way to see if your logic is sound and your assembly is correct without building complex hardware harnesses. It is, however, very limited in terms of timing accuracy, peripherals and CPU quirks. QEMU is not a cycle-perfect simulator and is not meant for catching subtle concurrency bugs. For that you’ll have to get real hardware.

Let’s start our testing with some helper functions:

void assert_handler(const char * file, int line, const char * stmt);
#ifndef DEBUG_ASSERT
#define DEBUG_ASSERT(stmt) do { \
        if (! __builtin_expect(!! (stmt), 1)) { \
            assert_handler(__FILE__, __LINE__, #stmt); \
            while (1) \
                ; \
        } \
    } while (0)
#endif

The __builtin_expect here is to aid compiler optimization – we expect the boolean value of stmt to be true most of the time. The !! (stmt) is a simple trick to convert any numeric value to a boolean 0/1.

Next, let’s define functions that will allow us to send information outside the simulation with the help of ARM Semihosting [6]:

#define SEMIHOSTING_EXIT_CODE_INTERNAL_ERROR 0x20024    /* exit code 1 */
#define SEMIHOSTING_EXIT_CODE_APPLICATION_EXIT 0x20026  /* exit code 0 */

void semihosting_write0(const char * str)
{
    __asm__ volatile (
"mov r0, #0x04\n\t"
"mov r1, %0\n\t"
"bkpt 0xAB"
    :               /* no outputs */
    : "r" (str)     /* input operands */
    : "r0", "r1"    /* Clobbers */
    );
}
__attribute__((noreturn)) void semihosting_exit(int code)
{
    __asm__ volatile (
"mov r0, #0x18\n\t"
"mov r1, %0\n\t"
"bkpt 0xAB"
    :
    : "r" (code)
    : "r0", "r1"
    );
    __builtin_unreachable();
}

The bkpt 0xAB instruction is the way to trigger a Semihosting call on Cortex-M. Other cores will need a different instruction. The semihosting host then catches this exception and retrieves the opcode and parameters from the registers, just like a system call. semihosting_write0 will write a null-terminated string to stdout, and semihosting_exit will quit the QEMU process with an exit code of 0 or 1.

We can then nicely report the reason for test failure as:

/* left as an exercise to the reader */
const char * int_to_str_(int i, char * buf, int buf_mlen);
void assert_handler(const char * file, int line, const char * stmt)
{
    char line_str[12];
    semihosting_write0("Assertion failed: ");
    semihosting_write0(file);
    semihosting_write0(":");
    semihosting_write0(int_to_str_(line, line_str, sizeof(line_str)));
    semihosting_write0(": ");
    semihosting_write0(stmt);
    semihosting_write0("\n");
    semihosting_exit(SEMIHOSTING_EXIT_CODE_INTERNAL_ERROR);
}

The last thing we’ll need is a vector table to make our code runnable:

.syntax unified
.thumb
.global .vector
.vector:
    .word _stack            @ initial SP value, provided by the linker script
    .word isr_reset
    .word isr_nmi
    .word isr_hard_fault
    .word isr_mem_manage
    .word isr_bus_fault
    .word isr_usage_fault
    .skip 16                @ reserved
    .word isr_sv_call
    .word isr_debug_monitor
    .skip 4                 @ reserved
    .word isr_pend_sv
    .word isr_systick
    .skip 344

… and a basic linker script:

MEMORY {
    flash  (rx)   : ORIGIN = 0x08000000, LENGTH = 512K
    sram   (w!x)  : ORIGIN = 0x20000000, LENGTH = 128K
}
SECTIONS {
    .text : {
        KEEP(*(.vector))
        *(.text)
    } > flash
    .data : {
        *(.data*)
    } > sram AT > flash
}
ENTRY(isr_reset);
EXTERN(.vector);
PROVIDE(_stack = ORIGIN(sram) + LENGTH(sram));

Now we can write our tests as:

static unsigned char store[10000];

void test_one_block(void)
{
    unsigned int store_mlen = BMALLOC_STORE_MLEN_FOR_BLOCKS(1, 1);
    struct bmalloc_ctx * b;
    unsigned char * ptr1;
    unsigned char * ptr2;
    b = bmalloc_init(store, store_mlen, 1);

    ptr1 = bmalloc(b);
    DEBUG_ASSERT(ptr1 != NULL);
    DEBUG_ASSERT(ptr1 >= store && ptr1 < store + store_mlen);
    bfree(b, ptr1);

    ptr1 = bmalloc(b);
    ptr2 = bmalloc(b);
    DEBUG_ASSERT(ptr1 != NULL && ptr2 == NULL);
    bfree(b, ptr1);
}

void isr_reset(void)
{
    semihosting_write0("Test started\n");

    test_one_block();

    semihosting_write0("Test passed\n");
    semihosting_exit(SEMIHOSTING_EXIT_CODE_APPLICATION_EXIT);
}

void isr_hard_fault(void)
{
    semihosting_write0("HardFault\n");
    semihosting_exit(SEMIHOSTING_EXIT_CODE_INTERNAL_ERROR);
}

Now running it is as simple as:

# qemu-system-arm -M netduinoplus2 -s -nographic -semihosting \
    -kernel test_bmalloc.elf
Test started
Test passed

-M netduinoplus2 selects a machine with an STM32F405RGT6 featuring ARM Cortex-M4. -s starts a gdb server at localhost:1234. -semihosting, as the name implies, enables support for the ARM Semihosting instructions. You may also pass the -S flag to halt the simulation after start and connect with a debugger.

Now, let’s add a double free to our code and see how it looks:

# qemu-system-arm -M netduinoplus2 -s -nographic -semihosting \
        -kernel test_bmalloc.elf
Test started
Assertion failed: bmalloc.c:81: free_len < c->free_mlen
# echo $?
1

You can also use the same semihosting approach on a real controller, but that’s a whole other story.

And that is all I have for you today. For further reading, please check out an awesome blog by Dmitry Vyukov about lock-free algorithms and beyond [2]. If you have any practical guides on lock-free implementations on ARM specifically, please do share.

Literature