# Copyright 2025 Apheleia
#
# Description:
# Apheleia Verification Library Agent Configuration
import os
import struct
import subprocess
import threading
import avl
import cocotb
import cocotb.utils
from cocotb.triggers import NextTimeStep, NullTrigger, Timer
[docs]
class X86Agent(avl.Agent):
"""
Agent configuration for x86 architecture.
"""
# QEMU Binary
BIN = os.path.join(os.environ.get("QEMU_HOME", os.path.expanduser("~")), "bin/qemu-system-x86_64")
# Incoming request paramters
REQUEST_PIPE = "./AVL_QEMU_IN"
REQUEST_FMT = '<QIIQI'
REQUEST_SIZE = struct.calcsize(REQUEST_FMT)
REQUEST_TYPE = {0: 'Fetch', 1: 'Load', 2: 'Store'}
# Outgoing response parameters
RESPONSE_PIPE = "./AVL_QEMU_OUT"
RESPONSE_FMT = '<IQ'
# Ranges File - defines memory regions
RANGES_FILE = "./avl_map.txt"
# Kernel - c test to be executed on QEMU side
KERNEL = "./c/kernel.elf"
# Polling period for checking QEMU process status in nanoseconds
POLL_PERIOD_NS = 10
[docs]
def __init__(self, name: str, parent: avl.Component) -> None:
"""
Initialize the x86 agent with specific capabilities and supported architectures.
:param name: The name of the agent.
:type name: str
:param parent_agent: Optional parent agent for hierarchical structuring.
:type parent_agent: avl.Component
:return: None
"""
super().__init__(name, parent)
# Create pipe for communication
self.pipes = {"in":
{"path": avl.Factory.get_variable(f"{self.get_full_name()}.request_pipe", self.REQUEST_PIPE), "fd": None},
"out":
{"path": avl.Factory.get_variable(f"{self.get_full_name()}.response_pipe", self.RESPONSE_PIPE), "fd": None}
}
self._open_pipe_()
# Handle to the QEMU process
self.qemu_proc = None
# Supported ranges
self.ranges = []
# Create QEMU instance for x86 architecture
qemu_bin = avl.Factory.get_variable(f"{self.get_full_name()}.qemu_bin", self.BIN)
# The kernel binary to be loaded by QEMU. This can be overridden by the user to specify a different test program.
qemu_kernel = avl.Factory.get_variable(f"{self.get_full_name()}.qemu_kernel_bin", self.KERNEL)
self.qemu_cmd = [
qemu_bin,
"-kernel", qemu_kernel,
"-m", "128M",
"-display", "none",
"-serial", "mon:stdio",
"-device", "avl-hook,file=avl_map.txt",
"-device", "isa-debug-exit,iobase=0x501,iosize=0x04",
]
# Monitor / Handle memory access from qemu in a separate thread
cocotb.start_soon(self._handle_pipe_())
[docs]
def add_range(self, name: str, start: int, size: int, read_cb: callable, write_cb : callable) -> None:
"""
Add a memory range to the agent's configuration.
:param name: The name of the memory range.
:type name: str
:param start: The starting address of the memory range.
:type start: int
:param size: The size of the region
:type size: int
:param read_cb: callback function for read operations on this range.
:type read_cb: callable
:param write_cb: callback function for write operations on this range.
:type write_cb: callable
:return: None
"""
self.ranges.append((name, start, size, read_cb, write_cb))
def _open_pipe_(self) -> None:
"""
Open a communication pipe for the agent.
This method can be overridden to implement specific communication protocols or channels.
By default new pipes are created at the specified paths and opened for non-blocking read/write operations.
:return: None
"""
for p,v in self.pipes.items():
if os.path.exists(v["path"]):
os.remove(v["path"])
os.mkfifo(v["path"])
self.pipes[p]["fd"] = os.open(v["path"], os.O_RDWR | os.O_NONBLOCK)
def _close_pipe_(self) -> None:
"""
Close the communication pipe for the agent.
This method can be overridden to implement specific cleanup procedures for communication channels.
By default, it closes any open file descriptors for the pipes and removes the pipe files from the filesystem.
:return: None
"""
for v in self.pipes.values():
if v["fd"] is not None:
v["fd"].close()
v["fd"] = None
if os.path.exists(v["path"]):
os.remove(v["path"])
async def _handle_pipe_(self) -> None:
"""
Handle incoming requests from QEMU through the communication pipe.
This method continuously reads from the input pipe, processes requests based on their type (fetch, load, store), and sends responses back through the output pipe.
By default no time is consumed when there are no requests or when handling exceptions, allowing for efficient communication without unnecessary delays.
"""
while True:
try:
data = os.read(self.pipes["in"]["fd"], self.REQUEST_SIZE)
if not data or len(data) < self.REQUEST_SIZE:
await NextTimeStep()
continue
# Unpack the packet
addr, size, type_, data_, region_id = struct.unpack(self.REQUEST_FMT, data)
if type_ == 2: # STORE
await self.ranges[region_id][4](addr, size, data_)
else: # LOAD
rdata = await self.ranges[region_id][3](addr, size)
# Pack and write back to the pipe
response = struct.pack(self.RESPONSE_FMT, 1, rdata)
os.write(self.pipes["out"]["fd"], response)
except BlockingIOError:
if self.qemu_proc and self.qemu_proc.poll() is not None:
break
await NullTrigger()
continue
except OSError:
await NullTrigger()
continue
def _qemu_stdout_reader_(self, proc : subprocess.Popen) -> None:
"""
Read the stdout of the QEMU process and log it.
:param proc: The subprocess.Popen object representing the QEMU process.
:type proc: subprocess.Popen
:return: None
"""
try:
for line in proc.stdout:
self.info(f"[QEMU] {line.rstrip()}")
finally:
proc.stdout.close()
[docs]
async def run_phase(self) -> None:
"""
Run the main phase of the agent.
Create the qemu instance and handle communication through the pipe.
"""
self.raise_objection()
# Create file with memory ranges for plugin
assert len(self.ranges) > 0, "At least one memory range must be added to the agent configuration using add_range() before running the agent."
assert len(self.ranges) < 16, "A maximum of 16memory ranges can be added to the agent configuration due to region_id being a single byte in the communication protocol."
if os.path.exists(self.RANGES_FILE):
os.remove(self.RANGES_FILE)
with open(self.RANGES_FILE, "w") as f:
f.write(f"{self.REQUEST_PIPE},{self.RESPONSE_PIPE}\n")
for _name, start, size, _read_cb, _write_cb in self.ranges:
f.write(f"0x{start:x},0x{size:x}\n")
# Start the qemu process and read its output
self.qemu_proc = subprocess.Popen(
self.qemu_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True
)
threading.Thread(target=self._qemu_stdout_reader_, args=(self.qemu_proc,), daemon=True).start()
# Wait for QEMU to exit
while self.qemu_proc.poll() is None:
await Timer(self.POLL_PERIOD_NS, unit="ns")
# Check exit code
if self.qemu_proc.returncode != 33:
self._close_pipe_() # Ensure pipes are closed if QEMU exits with error
self.fatal(f"QEMU exited with code {self.qemu_proc.returncode}")
self.drop_objection()
__all__ = ["X86Agent"]