-
Автор темы
- #1
Опять наебнулся в многопоточке, памагити! Пишу модуль для обработки трейдинговых потоков на C++, и меня чёт наёбывает shared_ptr в критической секции. Суть такая: есть очередь trade-ивентов в кастомном локфри кольцевом буфере (под линукс с использованием futex). В каждом ивенте есть ссылка на базовый объект с ордером (shared_ptr<BaseOrder>), который шарится между несколькими консьюмерами.
Проблема в том, что иногда (где-то раз в 100к событий) в дебаге вылетает SIGSEGV прямо в момент разыменования этого указателя, хотя я точно знаю что объект должен жить - у него есть минимум два держателя ссылки. Уже все мозги сломал - смотрел в валгринде, вроде утечек нет, атомики везде правильно расставлены, weak_ptr не использую, но где-то явно происходит преждевременное уничтожение. Может кто сталкивался? Заебался уже с этими рейсами бороться. Код приложу. Может кто в сторону копнуть подскажет?
P.S.: Нет, мьютексы использовать нельзя - там SLA по латенси 100 микросекунд.
Вот это примерно тот пиздец, с которым я мучаюсь. Главная проблема где-то в районе process_events(), когда несколько тредов пытаются обработать один и тот же ордер. Теоретически shared_ptr должен защищать от уничтожения объекта, пока есть хотя бы одна ссылка, но что-то явно идёт не так. И да, я знаю что ref_count в BaseOrder немного костыльный, но он нужен для трекинга активных обработок, а не для управления памятью. Может кто видит, где я облажался с memory ordering или ещё где-то течёт?
Проблема в том, что иногда (где-то раз в 100к событий) в дебаге вылетает SIGSEGV прямо в момент разыменования этого указателя, хотя я точно знаю что объект должен жить - у него есть минимум два держателя ссылки. Уже все мозги сломал - смотрел в валгринде, вроде утечек нет, атомики везде правильно расставлены, weak_ptr не использую, но где-то явно происходит преждевременное уничтожение. Может кто сталкивался? Заебался уже с этими рейсами бороться. Код приложу. Может кто в сторону копнуть подскажет?
P.S.: Нет, мьютексы использовать нельзя - там SLA по латенси 100 микросекунд.
C++:
#include <atomic>
#include <memory>
#include <thread>
#include <vector>
#include <linux/futex.h>
#include <sys/syscall.h>
struct BaseOrder {
std::atomic<int> ref_count{0};
std::atomic<bool> is_active{true};
double price;
int64_t volume;
virtual ~BaseOrder() = default;
};
struct TradeEvent {
std::shared_ptr<BaseOrder> order;
int64_t timestamp;
double executed_price;
};
template<typename T, size_t Size>
class LockFreeRingBuffer {
private:
alignas(64) std::atomic<size_t> head_{0};
alignas(64) std::atomic<size_t> tail_{0};
T buffer_[Size];
static inline void futex_wait(std::atomic<size_t>& futex, size_t expected) {
syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, expected, nullptr, nullptr, 0);
}
static inline void futex_wake(std::atomic<size_t>& futex) {
syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
}
public:
bool try_push(const T& item) {
size_t head = head_.load(std::memory_order_relaxed);
size_t next_head = (head + 1) % Size;
if (next_head == tail_.load(std::memory_order_acquire)) {
return false;
}
buffer_[head] = item;
head_.store(next_head, std::memory_order_release);
futex_wake(head_);
return true;
}
bool try_pop(T& item) {
size_t tail = tail_.load(std::memory_order_relaxed);
if (tail == head_.load(std::memory_order_acquire)) {
futex_wait(head_, tail);
return false;
}
item = std::move(buffer_[tail]);
tail_.store((tail + 1) % Size, std::memory_order_release);
return true;
}
};
class EventProcessor {
private:
LockFreeRingBuffer<TradeEvent, 1024> event_queue_;
std::vector<std::thread> workers_;
std::atomic<bool> running_{true};
void process_events() {
while (running_) {
TradeEvent event;
if (event_queue_.try_pop(event)) {
// Вот тут иногда происходит SIGSEGV
if (event.order && event.order->is_active.load(std::memory_order_acquire)) {
process_single_event(event);
}
}
}
}
void process_single_event(const TradeEvent& event) {
event.order->ref_count.fetch_add(1, std::memory_order_relaxed);
std::this_thread::sleep_for(std::chrono::microseconds(50));
event.order->ref_count.fetch_sub(1, std::memory_order_release);
}
public:
EventProcessor(size_t num_threads = 4) {
for (size_t i = 0; i < num_threads; ++i) {
workers_.emplace_back([this] { process_events(); });
}
}
void push_event(const TradeEvent& event) {
while (!event_queue_.try_push(event)) {
std::this_thread::yield();
}
}
~EventProcessor() {
running_ = false;
for (auto& worker : workers_) {
worker.join();
}
}
};