Source code for avl_qemu._x86

# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Agent Configuration

import os
import struct
import subprocess
import threading

import avl
import cocotb
import cocotb.utils
from cocotb.triggers import NextTimeStep, NullTrigger, Timer


[docs] class X86Agent(avl.Agent): """ Agent configuration for x86 architecture. """ # QEMU Binary BIN = os.path.join(os.environ.get("QEMU_HOME", os.path.expanduser("~")), "bin/qemu-system-x86_64") # Incoming request paramters REQUEST_PIPE = "./AVL_QEMU_IN" REQUEST_FMT = '<QIIQI' REQUEST_SIZE = struct.calcsize(REQUEST_FMT) REQUEST_TYPE = {0: 'Fetch', 1: 'Load', 2: 'Store'} # Outgoing response parameters RESPONSE_PIPE = "./AVL_QEMU_OUT" RESPONSE_FMT = '<IQ' # Ranges File - defines memory regions RANGES_FILE = "./avl_map.txt" # Kernel - c test to be executed on QEMU side KERNEL = "./c/kernel.elf" # Polling period for checking QEMU process status in nanoseconds POLL_PERIOD_NS = 10
[docs] def __init__(self, name: str, parent: avl.Component) -> None: """ Initialize the x86 agent with specific capabilities and supported architectures. :param name: The name of the agent. :type name: str :param parent_agent: Optional parent agent for hierarchical structuring. :type parent_agent: avl.Component :return: None """ super().__init__(name, parent) # Create pipe for communication self.pipes = {"in": {"path": avl.Factory.get_variable(f"{self.get_full_name()}.request_pipe", self.REQUEST_PIPE), "fd": None}, "out": {"path": avl.Factory.get_variable(f"{self.get_full_name()}.response_pipe", self.RESPONSE_PIPE), "fd": None} } self._open_pipe_() # Handle to the QEMU process self.qemu_proc = None # Supported ranges self.ranges = [] # Create QEMU instance for x86 architecture qemu_bin = avl.Factory.get_variable(f"{self.get_full_name()}.qemu_bin", self.BIN) # The kernel binary to be loaded by QEMU. This can be overridden by the user to specify a different test program. qemu_kernel = avl.Factory.get_variable(f"{self.get_full_name()}.qemu_kernel_bin", self.KERNEL) self.qemu_cmd = [ qemu_bin, "-kernel", qemu_kernel, "-m", "128M", "-display", "none", "-serial", "mon:stdio", "-device", "avl-hook,file=avl_map.txt", "-device", "isa-debug-exit,iobase=0x501,iosize=0x04", ] # Monitor / Handle memory access from qemu in a separate thread cocotb.start_soon(self._handle_pipe_())
[docs] def add_range(self, name: str, start: int, size: int, read_cb: callable, write_cb : callable) -> None: """ Add a memory range to the agent's configuration. :param name: The name of the memory range. :type name: str :param start: The starting address of the memory range. :type start: int :param size: The size of the region :type size: int :param read_cb: callback function for read operations on this range. :type read_cb: callable :param write_cb: callback function for write operations on this range. :type write_cb: callable :return: None """ self.ranges.append((name, start, size, read_cb, write_cb))
def _open_pipe_(self) -> None: """ Open a communication pipe for the agent. This method can be overridden to implement specific communication protocols or channels. By default new pipes are created at the specified paths and opened for non-blocking read/write operations. :return: None """ for p,v in self.pipes.items(): if os.path.exists(v["path"]): os.remove(v["path"]) os.mkfifo(v["path"]) self.pipes[p]["fd"] = os.open(v["path"], os.O_RDWR | os.O_NONBLOCK) def _close_pipe_(self) -> None: """ Close the communication pipe for the agent. This method can be overridden to implement specific cleanup procedures for communication channels. By default, it closes any open file descriptors for the pipes and removes the pipe files from the filesystem. :return: None """ for v in self.pipes.values(): if v["fd"] is not None: v["fd"].close() v["fd"] = None if os.path.exists(v["path"]): os.remove(v["path"]) async def _handle_pipe_(self) -> None: """ Handle incoming requests from QEMU through the communication pipe. This method continuously reads from the input pipe, processes requests based on their type (fetch, load, store), and sends responses back through the output pipe. By default no time is consumed when there are no requests or when handling exceptions, allowing for efficient communication without unnecessary delays. """ while True: try: data = os.read(self.pipes["in"]["fd"], self.REQUEST_SIZE) if not data or len(data) < self.REQUEST_SIZE: await NextTimeStep() continue # Unpack the packet addr, size, type_, data_, region_id = struct.unpack(self.REQUEST_FMT, data) if type_ == 2: # STORE await self.ranges[region_id][4](addr, size, data_) else: # LOAD rdata = await self.ranges[region_id][3](addr, size) # Pack and write back to the pipe response = struct.pack(self.RESPONSE_FMT, 1, rdata) os.write(self.pipes["out"]["fd"], response) except BlockingIOError: if self.qemu_proc and self.qemu_proc.poll() is not None: break await NullTrigger() continue except OSError: await NullTrigger() continue def _qemu_stdout_reader_(self, proc : subprocess.Popen) -> None: """ Read the stdout of the QEMU process and log it. :param proc: The subprocess.Popen object representing the QEMU process. :type proc: subprocess.Popen :return: None """ try: for line in proc.stdout: self.info(f"[QEMU] {line.rstrip()}") finally: proc.stdout.close()
[docs] async def run_phase(self) -> None: """ Run the main phase of the agent. Create the qemu instance and handle communication through the pipe. """ self.raise_objection() # Create file with memory ranges for plugin assert len(self.ranges) > 0, "At least one memory range must be added to the agent configuration using add_range() before running the agent." assert len(self.ranges) < 16, "A maximum of 16memory ranges can be added to the agent configuration due to region_id being a single byte in the communication protocol." if os.path.exists(self.RANGES_FILE): os.remove(self.RANGES_FILE) with open(self.RANGES_FILE, "w") as f: f.write(f"{self.REQUEST_PIPE},{self.RESPONSE_PIPE}\n") for _name, start, size, _read_cb, _write_cb in self.ranges: f.write(f"0x{start:x},0x{size:x}\n") # Start the qemu process and read its output self.qemu_proc = subprocess.Popen( self.qemu_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, universal_newlines=True ) threading.Thread(target=self._qemu_stdout_reader_, args=(self.qemu_proc,), daemon=True).start() # Wait for QEMU to exit while self.qemu_proc.poll() is None: await Timer(self.POLL_PERIOD_NS, unit="ns") # Check exit code if self.qemu_proc.returncode != 33: self._close_pipe_() # Ensure pipes are closed if QEMU exits with error self.fatal(f"QEMU exited with code {self.qemu_proc.returncode}") self.drop_objection()
__all__ = ["X86Agent"]