Вопрос Вылетает SIGSEGV прямо в момент разыменования указателя.

Начинающий
Статус
Оффлайн
Регистрация
24 Сен 2024
Сообщения
34
Реакции[?]
7
Поинты[?]
7K
Опять наебнулся в многопоточке, памагити! Пишу модуль для обработки трейдинговых потоков на C++, и меня чёт наёбывает shared_ptr в критической секции. Суть такая: есть очередь trade-ивентов в кастомном локфри кольцевом буфере (под линукс с использованием futex). В каждом ивенте есть ссылка на базовый объект с ордером (shared_ptr<BaseOrder>), который шарится между несколькими консьюмерами.

Проблема в том, что иногда (где-то раз в 100к событий) в дебаге вылетает SIGSEGV прямо в момент разыменования этого указателя, хотя я точно знаю что объект должен жить - у него есть минимум два держателя ссылки. Уже все мозги сломал - смотрел в валгринде, вроде утечек нет, атомики везде правильно расставлены, weak_ptr не использую, но где-то явно происходит преждевременное уничтожение. Может кто сталкивался? Заебался уже с этими рейсами бороться. Код приложу. Может кто в сторону копнуть подскажет?

P.S.: Нет, мьютексы использовать нельзя - там SLA по латенси 100 микросекунд.

C++:
#include <atomic>
#include <memory>
#include <thread>
#include <vector>
#include <linux/futex.h>
#include <sys/syscall.h>

struct BaseOrder {
    std::atomic<int> ref_count{0};
    std::atomic<bool> is_active{true};
    double price;
    int64_t volume;
   
    virtual ~BaseOrder() = default;
};

struct TradeEvent {
    std::shared_ptr<BaseOrder> order;
    int64_t timestamp;
    double executed_price;
};

template<typename T, size_t Size>
class LockFreeRingBuffer {
private:
    alignas(64) std::atomic<size_t> head_{0};
    alignas(64) std::atomic<size_t> tail_{0};
    T buffer_[Size];
   
    static inline void futex_wait(std::atomic<size_t>& futex, size_t expected) {
        syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, expected, nullptr, nullptr, 0);
    }
   
    static inline void futex_wake(std::atomic<size_t>& futex) {
        syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
    }

public:
    bool try_push(const T& item) {
        size_t head = head_.load(std::memory_order_relaxed);
        size_t next_head = (head + 1) % Size;
       
        if (next_head == tail_.load(std::memory_order_acquire)) {
            return false;
        }
       
        buffer_[head] = item;
        head_.store(next_head, std::memory_order_release);
        futex_wake(head_);
        return true;
    }
   
    bool try_pop(T& item) {
        size_t tail = tail_.load(std::memory_order_relaxed);
       
        if (tail == head_.load(std::memory_order_acquire)) {
            futex_wait(head_, tail);
            return false;
        }
       
        item = std::move(buffer_[tail]);
        tail_.store((tail + 1) % Size, std::memory_order_release);
        return true;
    }
};

class EventProcessor {
private:
    LockFreeRingBuffer<TradeEvent, 1024> event_queue_;
    std::vector<std::thread> workers_;
    std::atomic<bool> running_{true};
   
    void process_events() {
        while (running_) {
            TradeEvent event;
            if (event_queue_.try_pop(event)) {
                // Вот тут иногда происходит SIGSEGV
                if (event.order && event.order->is_active.load(std::memory_order_acquire)) {
                    process_single_event(event);
                }
            }
        }
    }
   
    void process_single_event(const TradeEvent& event) {
        event.order->ref_count.fetch_add(1, std::memory_order_relaxed);
        std::this_thread::sleep_for(std::chrono::microseconds(50));
        event.order->ref_count.fetch_sub(1, std::memory_order_release);
    }

public:
    EventProcessor(size_t num_threads = 4) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers_.emplace_back([this] { process_events(); });
        }
    }
   
    void push_event(const TradeEvent& event) {
        while (!event_queue_.try_push(event)) {
            std::this_thread::yield();
        }
    }
   
    ~EventProcessor() {
        running_ = false;
        for (auto& worker : workers_) {
            worker.join();
        }
    }
};
Вот это примерно тот пиздец, с которым я мучаюсь. Главная проблема где-то в районе process_events(), когда несколько тредов пытаются обработать один и тот же ордер. Теоретически shared_ptr должен защищать от уничтожения объекта, пока есть хотя бы одна ссылка, но что-то явно идёт не так. И да, я знаю что ref_count в BaseOrder немного костыльный, но он нужен для трекинга активных обработок, а не для управления памятью. Может кто видит, где я облажался с memory ordering или ещё где-то течёт?
 
Сверху Снизу