def attach_to_process(self):
"""Подключение к процессу с полными правами доступа"""
if not self.process_id:
logger.error("Не указан ID процесса")
return False
# Константы для прав доступа
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_OPERATION = 0x0008
# Открываем процесс с необходимыми правами
self.process_handle = ctypes.windll.kernel32.OpenProcess(
PROCESS_VM_READ | PROCESS_QUERY_INFORMATION | PROCESS_VM_OPERATION,
False,
self.process_id
)
if not self.process_handle:
error_code = ctypes.windll.kernel32.GetLastError()
logger.error(f"Ошибка при подключении к процессу: {error_code}")
return False
# Проверка битности процесса
is_wow64 = ctypes.c_bool()
if ctypes.windll.kernel32.IsWow64Process(self.process_handle, ctypes.byref(is_wow64)):
self.is_64bit = not is_wow64.value
else:
# Если функция не сработала, предполагаем 64-бит для современных систем
self.is_64bit = True
logger.info(
f"Подключение к процессу {self.process_id} успешно. Битность: {'64-bit' if self.is_64bit else '32-bit'}")
return True
def pattern_to_bytes(self, pattern: str):
"""
Преобразование строковой сигнатуры в список байтов.
Args:
pattern: строка в формате "AA BB ?? DD"
Returns:
Tuple[List[Optional[int]], int]: список байтов с None на месте wildcard и количество не-wildcard байтов
"""
try:
# Удаляем пробелы и разделяем на байты
pattern_parts = pattern.strip().split()
byte_pattern = []
non_wildcard_count = 0
for part in pattern_parts:
if part.lower() == "??" or part == "**" or part == "??":
byte_pattern.append(None) # Wildcard
else:
byte_value = int(part, 16)
if byte_value < 0 or byte_value > 255:
raise ValueError(f"Значение байта вне диапазона: {part}")
byte_pattern.append(byte_value)
non_wildcard_count += 1
logger.debug(
f"Преобразование паттерна: {pattern} -> {[hex(b) if b is not None else 'None' for b in byte_pattern]}")
logger.debug(f"Количество не-wildcard байтов: {non_wildcard_count}")
return byte_pattern, non_wildcard_count
except ValueError as e:
logger.error(f"Некорректный формат сигнатуры: {str(e)}")
raise
def get_memory_regions(self):
"""
Получение списка доступных для чтения регионов памяти процесса с использованием VirtualQueryEx.
Returns:
List[Tuple[int, int]]: список кортежей (начальный[I]адрес, конечный[/I]адрес)
"""
regions = []
# Определяем правильную структуру MEMORY_BASIC_INFORMATION
class MEMORY_BASIC_INFORMATION(ctypes.Structure):
[I]fields[/I] = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", ctypes.c_ulong),
("RegionSize", ctypes.c_size_t),
("State", ctypes.c_ulong),
("Protect", ctypes.c_ulong),
("Type", ctypes.c_ulong),
]
# Константы для VirtualQueryEx
MEM_COMMIT = 0x1000
PAGE_READWRITE = 0x04
PAGE_READONLY = 0x02
PAGE_EXECUTE_READ = 0x20
PAGE_EXECUTE_READWRITE = 0x40
PAGE_EXECUTE = 0x10
# Маска для проверки прав на чтение
READ_MASK = PAGE_READWRITE | PAGE_READONLY | PAGE_EXECUTE_READ | PAGE_EXECUTE_READWRITE | PAGE_EXECUTE
current_address = 0
mbi = MEMORY_BASIC_INFORMATION()
region_count = 0
readable_count = 0
while True:
# Запрашиваем информацию о регионе памяти
result = ctypes.windll.kernel32.VirtualQueryEx(
self.process_handle,
ctypes.c_void_p(current_address),
ctypes.byref(mbi),
ctypes.sizeof(mbi)
)
if result == 0:
# Достигнут конец адресного пространства или произошла ошибка
error_code = ctypes.windll.kernel32.GetLastError()
if error_code != 0:
logger.debug(f"VirtualQueryEx завершился с ошибкой: {error_code}")
break
region_count += 1
# Проверяем, что регион выделен и доступен для чтения
if (mbi.State == MEM_COMMIT and (mbi.Protect & READ_MASK)):
readable_count += 1
region_base = mbi.BaseAddress
region_size = mbi.RegionSize
region_end = region_base + region_size
regions.append((region_base, region_end))
# Выводим информацию о регионе
logger.debug(f"Регион [{len(regions) - 1}]: 0x{region_base:X} - 0x{region_end:X} "
f"({region_size} байт), защита: 0x{mbi.Protect:X}")
# Переходим к следующему региону
current_address = mbi.BaseAddress + mbi.RegionSize
# Проверка на переполнение для 32-битных процессов
if not self.is_64bit and current_address > 0xFFFFFFFF:
break
# Проверка на переполнение для 64-битных процессов
if self.is_64bit and current_address > 0x7FFFFFFFFFFFFFFF:
break
logger.info(f"Всего найдено регионов памяти: {region_count}")
logger.info(f"Из них доступно для чтения: {readable_count}")
# Выводим сводную информацию о регионах
for i, (start, end) in enumerate(regions):
logger.info(f"[{i}] Регион: 0x{start:X} - 0x{end:X} ({end - start} байт)")
return regions
def find_pattern(self,
pattern: str,
start_address: int = None,
end_address: int = None,
max_matches: int = 10,
debug_mode: bool = False) -> List[int]:
"""
Поиск сигнатуры в памяти процесса.
Args:
pattern: строка сигнатуры в формате "AA BB ?? DD"
start_address: начальный адрес поиска
end_address: конечный адрес поиска
max_matches: максимальное количество соответствий
debug_mode: включает дополнительный вывод для отладки
Returns:
List[int]: список найденных адресов
"""
if not self.process_handle:
logger.error("Нет подключения к процессу")
return []
try:
# Проверка открытия процесса с нужными правами
test_byte = ctypes.c_byte()
test_read = ctypes.c_size_t(0)
test_result = ctypes.windll.kernel32.ReadProcessMemory(
self.process_handle,
ctypes.c_void_p(0x1000), # Пробуем прочитать из начала памяти
ctypes.byref(test_byte),
ctypes.sizeof(test_byte),
ctypes.byref(test_read)
)
if not test_result:
error_code = ctypes.windll.kernel32.GetLastError()
logger.warning(f"Проверка чтения из процесса не удалась. Ошибка: {error_code} "
f"(возможно, недостаточно прав доступа)")
# Преобразование строковой сигнатуры в байтовый шаблон
byte_pattern, non_wildcard_count = self.pattern_to_bytes(pattern)
pattern_length = len(byte_pattern)
if non_wildcard_count == 0:
logger.error("Сигнатура не содержит определенных байтов")
return []
# Увеличиваем размер блока для предотвращения "разрыва" паттерна между блоками
block_size = 65536 # 64KB вместо 4KB
# Размер перекрытия между блоками равен длине паттерна - 1
overlap = pattern_length - 1
results = []
# Используем VirtualQueryEx для получения карты памяти
if start_address is None or end_address is None:
memory_regions = self.get_memory_regions()
if not memory_regions:
logger.error("Не удалось получить карту памяти процесса")
return []
else:
# Если заданы конкретные адреса, используем их
memory_regions = [(start_address, end_address)]
logger.info(f"Используем заданный диапазон: 0x{start_address:X} - 0x{end_address:X}")
regions_scanned = 0
for region_index, (region_start, region_end) in enumerate(memory_regions):
logger.info(f"Сканирование региона [{region_index}]: 0x{region_start:X} - 0x{region_end:X}")
current_address = region_start
while current_address < region_end and len(results) < max_matches:
# Корректируем размер блока, если мы близки к концу региона
actual_block_size = min(block_size, region_end - current_address)
if actual_block_size <= 0:
break
# Пытаемся прочитать блок памяти
buffer = ctypes.create_string_buffer(actual_block_size)
bytes_read = ctypes.c_size_t(0)
result = ctypes.windll.kernel32.ReadProcessMemory(
self.process_handle,
ctypes.c_void_p(current_address),
buffer,
ctypes.c_size_t(actual_block_size),
ctypes.byref(bytes_read)
)
if not result or bytes_read.value == 0:
error_code = ctypes.windll.kernel32.GetLastError()
logger.debug(f"Не удалось прочитать блок по адресу 0x{current_address:X}, ошибка: {error_code}")
# Переходим к следующему блоку
current_address += actual_block_size
continue
# Проверяем совпадения в прочитанном блоке
data = buffer.raw[:bytes_read.value]
# В режиме отладки выводим первые байты блока
if debug_mode:
if bytes_read.value > 0:
logger.debug(f"Первые 16 байт блока по адресу 0x{current_address:X}: "
f"{' '.join([f'{data[i]:02X}' for i in range(min(16, bytes_read.value))])}")
else:
logger.debug(f"Блок по адресу 0x{current_address:X} пуст")
for i in range(len(data) - pattern_length + 1):
match = True
for j in range(pattern_length):
if byte_pattern[j] is not None and data[i + j] != byte_pattern[j]:
match = False
break
if match:
match_address = current_address + i
results.append(match_address)
# Вывод контекста найденного совпадения
context_start = max(0, i - 8)
context_end = min(len(data), i + pattern_length + 8)
context_bytes = data[context_start:context_end]
hex_context = ' '.join([f'{b:02X}' for b in context_bytes])
logger.info(f"Найдено совпадение по адресу 0x{match_address:X}")
logger.info(f"Контекст: [...{hex_context}...]")
# Визуально выделяем найденный паттерн в контексте
pattern_start_offset = i - context_start
pattern_end_offset = pattern_start_offset + pattern_length
pattern_visual = ' '.join(['^^' if (k >= pattern_start_offset and k < pattern_end_offset)
else ' ' for k in range(len(context_bytes))])
logger.info(f"Позиция: [...{pattern_visual}...]")
if len(results) >= max_matches:
break
# Сдвигаемся на (block_size - overlap) байт вперед
# Это гарантирует, что сигнатура не будет разбита между блоками
current_address += (
actual_block_size - overlap) if actual_block_size > overlap else actual_block_size
regions_scanned += 1
# Периодически сообщаем о прогрессе
if regions_scanned % 10 == 0:
logger.info(f"Просканировано {regions_scanned}/{len(memory_regions)} регионов...")
logger.info(f"Поиск завершен. Найдено совпадений: {len(results)}")
return results
except ValueError as e:
logger.error(f"Ошибка в формате сигнатуры: {str(e)}")
return []
except Exception as e:
logger.error(f"Ошибка при поиске сигнатуры: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return []