Module loda.runtime.interpreter

Interpret programs.

Expand source code
"""Interpret programs."""

from loda.lang import Operand, Operation, Program
from loda.oeis import ProgramCache
from .operations import exec_arithmetic


class Interpreter:

    def __init__(self,
                 program_cache: ProgramCache = None,
                 max_steps: int = -1,
                 max_memory: int = -1,
                 max_loop_stack_size: int = 100,
                 max_terms_cache_size: int = 10000):
        """
        The interpreter class is used to run LODA programs. Running programs operate on memory dictionaries
        mapping indices to integer values. The most common use case it to evaluate programs to integer sequences.

        Args:
            program_cache: Program cache (optional). When referencing programs using OEIS sequence IDs,
                the program cache must be supplied.
            max_steps: Maximum number of executions steps (-1 for no limit).
            max_loop_stack_size: Maximum stack size used for loops (-1 for no limit).
            max_terms_cache_size: Maximum cache size for sequence terms (-1 for no limit).

        ## Example
        >>> # Evaluate a program to an integer sequence:
        >>> fibonacci = Program(...)
        >>> interpreter.eval_to_seq(fibonacci, num_terms=10)
        ([0, 1, 1, 2, 3, 5, 8, 13, 21, 34], 305)

        """
        self.__program_cache = program_cache
        self.__max_steps = max_steps
        self.__max_memory = max_memory
        self.__max_loop_stack_size = max_loop_stack_size
        self.__max_terms_cache_size = max_terms_cache_size
        self.__terms_cache = {}
        self.__running_programs = set()

    def clear_cache(self):
        """Clear the program and the terms cache. This is useful to free accumulated memory."""
        if self.__program_cache:
            self.__program_cache.clear()
        self.__terms_cache.clear()

    def eval_to_seq(self, program: Program, num_terms: int, use_steps=False):
        """
        Evaluate a program to an integer sequence.

        Args:
            program: The program to be evaluated.
            num_terms: The number of sequence terms to be computed.
            use_steps: Flag indicating whether to return the number of steps as sequence.
        Return:
            This function returns a pair. The first entry is the computed integer sequence
            as a list of ints. The second entry is the number of used computation steps.
        """
        seq = []
        mem = {}
        total_steps = 0
        for i in range(num_terms):
            mem.clear()
            mem[0] = i
            steps = self.run(program, mem)
            seq.append(steps if use_steps else mem[0])
            total_steps += steps
        return seq, total_steps

    def run(self, id_or_program, memory: dict):
        """Run a program."""
        if isinstance(id_or_program, Program):
            return self.__run(id_or_program, memory)
        elif isinstance(id_or_program, int):
            id = id_or_program
            if id in self.__running_programs:
                self.__raise("recursion detected")
            if self.__program_cache is None:
                self.__raise("program cache not set")
            program = self.__program_cache.get(id)
            if program is None:
                self.__raise("program not found: {}".format(id))
            self.__running_programs.add(id)
            steps = 0
            try:
                steps = self.__run(program, memory)
            finally:
                self.__running_programs.remove(id)
            return steps
        else:
            self.__raise(
                "expected ID or program: {}".format(id_or_program))

    def __run(self, p: Program, mem: dict) -> int:

        # remove nop operations
        ops = list(
            filter(lambda op: (op.type != Operation.Type.NOP), p.operations))

        # check for loops with fragments
        if any(op.type == Operation.Type.LPB and op.source != Operand(Operand.Type.CONSTANT, 1) for op in ops):
            self.__raise("fragments not supported")

        # check for empty program
        if len(ops) == 0:
            return 0

        # define stacks
        loop_stack = []
        counter_stack = []
        mem_stack = []

        steps = 0
        num_ops = len(ops)
        mem_tmp = mem.copy()
        mem_seq = {}

        # start program execution
        pc = 0
        while pc < num_ops:
            op = ops[pc]
            pc_next = pc + 1

            if op.type == Operation.Type.LPB:
                if len(loop_stack) >= self.__max_loop_stack_size and self.__max_loop_stack_size >= 0:
                    self.__raise(
                        "maximum stack size exceeded: {}".format(len(loop_stack)))
                loop_stack.append(pc)
                mem_stack.append(mem_tmp.copy())
                counter = self.__get(op.target, mem_tmp, False)
                counter_stack.append(counter)

            elif op.type == Operation.Type.LPE:
                lpb = ops[loop_stack[-1]]
                counter = self.__get(lpb.target, mem_tmp, False)
                if counter >= 0 and counter < counter_stack[-1]:
                    pc_next = loop_stack[-1] + 1  # jump back to begin
                    mem_stack[-1] = mem_tmp.copy()
                    counter_stack[-1] = counter
                else:
                    mem_tmp = mem_stack.pop()
                    loop_stack.pop()
                    counter_stack.pop()

            elif op.type == Operation.Type.SEQ:
                argument = self.__get(op.target, mem_tmp)
                seq_id = self.__get(op.source, mem_tmp)
                key = (seq_id, argument)
                if key in self.__terms_cache:
                    seq_result, seq_steps = self.__terms_cache[key]
                else:
                    mem_seq.clear()
                    mem_seq[0] = argument
                    seq_steps = self.run(seq_id, mem_seq)
                    seq_result = mem_seq.get(0, 0)
                    if self.__max_terms_cache_size < 0 or len(self.__terms_cache) <= self.__max_terms_cache_size:
                        self.__terms_cache[key] = (seq_result, seq_steps)
                self.__set(op.target, seq_result, mem_tmp, op)
                steps += seq_steps

            else:
                # arithmetic operation
                target = self.__get(op.target, mem_tmp)
                source = self.__get(op.source, mem_tmp)
                self.__set(op.target, exec_arithmetic(
                    op.type, target, source), mem_tmp, op)

            pc = pc_next

            # count execution steps
            steps += 1

            # check resource constraints
            if steps > self.__max_steps and self.__max_steps >= 0:
                self.__raise("exceeded maximum number of steps ({}); last operation: {}".format(
                    self.__max_steps, op))
            if len(mem_tmp) > self.__max_memory and self.__max_memory >= 0:
                self.__raise(
                    "exceeded maximum memory: {}; last operation: {} ".format(len(mem_tmp), op))

        # sanity check
        if len(loop_stack) + len(counter_stack) + len(mem_stack) > 0:
            self.__raise("execution error")

        # update main memory and return steps
        mem.clear()
        mem.update(mem_tmp)
        return steps

    def __get(self, op: Operand, mem, get_address=False):
        if op.type == Operand.Type.CONSTANT:
            if get_address:
                self.__raise("cannot get address of a constant")
            return op.value
        elif op.type == Operand.Type.DIRECT:
            return op.value if get_address else mem.get(op.value, 0)
        elif op.type == Operand.Type.INDIRECT:
            return mem.get(op.value, 0) if get_address else mem.get(mem.get(op.value, 0), 0)

    def __set(self, op: Operand, v, mem, last):
        index = op.value
        if op.type == Operand.Type.CONSTANT:
            self.__raise("cannot set value of a constant")
        elif op.type == Operand.Type.INDIRECT:
            index = mem.get(index, 0)
        if index > self.__max_memory and self.__max_memory >= 0:
            self.__raise(
                "maximum memory exceeded: {}; last operation: {}".format(index, last))
        if v == None:
            self.__raise(
                "overflow in {}; last operation: {}".format(op, last))
        mem[index] = v

    def __raise(self, msg: str) -> None:
        raise ValueError(msg)

Classes

class Interpreter (program_cache: ProgramCache = None, max_steps: int = -1, max_memory: int = -1, max_loop_stack_size: int = 100, max_terms_cache_size: int = 10000)

The interpreter class is used to run LODA programs. Running programs operate on memory dictionaries mapping indices to integer values. The most common use case it to evaluate programs to integer sequences.

Args

program_cache
Program cache (optional). When referencing programs using OEIS sequence IDs, the program cache must be supplied.
max_steps
Maximum number of executions steps (-1 for no limit).
max_loop_stack_size
Maximum stack size used for loops (-1 for no limit).
max_terms_cache_size
Maximum cache size for sequence terms (-1 for no limit).

Example

>>> # Evaluate a program to an integer sequence:
>>> fibonacci = Program(...)
>>> interpreter.eval_to_seq(fibonacci, num_terms=10)
([0, 1, 1, 2, 3, 5, 8, 13, 21, 34], 305)
Expand source code
class Interpreter:

    def __init__(self,
                 program_cache: ProgramCache = None,
                 max_steps: int = -1,
                 max_memory: int = -1,
                 max_loop_stack_size: int = 100,
                 max_terms_cache_size: int = 10000):
        """
        The interpreter class is used to run LODA programs. Running programs operate on memory dictionaries
        mapping indices to integer values. The most common use case it to evaluate programs to integer sequences.

        Args:
            program_cache: Program cache (optional). When referencing programs using OEIS sequence IDs,
                the program cache must be supplied.
            max_steps: Maximum number of executions steps (-1 for no limit).
            max_loop_stack_size: Maximum stack size used for loops (-1 for no limit).
            max_terms_cache_size: Maximum cache size for sequence terms (-1 for no limit).

        ## Example
        >>> # Evaluate a program to an integer sequence:
        >>> fibonacci = Program(...)
        >>> interpreter.eval_to_seq(fibonacci, num_terms=10)
        ([0, 1, 1, 2, 3, 5, 8, 13, 21, 34], 305)

        """
        self.__program_cache = program_cache
        self.__max_steps = max_steps
        self.__max_memory = max_memory
        self.__max_loop_stack_size = max_loop_stack_size
        self.__max_terms_cache_size = max_terms_cache_size
        self.__terms_cache = {}
        self.__running_programs = set()

    def clear_cache(self):
        """Clear the program and the terms cache. This is useful to free accumulated memory."""
        if self.__program_cache:
            self.__program_cache.clear()
        self.__terms_cache.clear()

    def eval_to_seq(self, program: Program, num_terms: int, use_steps=False):
        """
        Evaluate a program to an integer sequence.

        Args:
            program: The program to be evaluated.
            num_terms: The number of sequence terms to be computed.
            use_steps: Flag indicating whether to return the number of steps as sequence.
        Return:
            This function returns a pair. The first entry is the computed integer sequence
            as a list of ints. The second entry is the number of used computation steps.
        """
        seq = []
        mem = {}
        total_steps = 0
        for i in range(num_terms):
            mem.clear()
            mem[0] = i
            steps = self.run(program, mem)
            seq.append(steps if use_steps else mem[0])
            total_steps += steps
        return seq, total_steps

    def run(self, id_or_program, memory: dict):
        """Run a program."""
        if isinstance(id_or_program, Program):
            return self.__run(id_or_program, memory)
        elif isinstance(id_or_program, int):
            id = id_or_program
            if id in self.__running_programs:
                self.__raise("recursion detected")
            if self.__program_cache is None:
                self.__raise("program cache not set")
            program = self.__program_cache.get(id)
            if program is None:
                self.__raise("program not found: {}".format(id))
            self.__running_programs.add(id)
            steps = 0
            try:
                steps = self.__run(program, memory)
            finally:
                self.__running_programs.remove(id)
            return steps
        else:
            self.__raise(
                "expected ID or program: {}".format(id_or_program))

    def __run(self, p: Program, mem: dict) -> int:

        # remove nop operations
        ops = list(
            filter(lambda op: (op.type != Operation.Type.NOP), p.operations))

        # check for loops with fragments
        if any(op.type == Operation.Type.LPB and op.source != Operand(Operand.Type.CONSTANT, 1) for op in ops):
            self.__raise("fragments not supported")

        # check for empty program
        if len(ops) == 0:
            return 0

        # define stacks
        loop_stack = []
        counter_stack = []
        mem_stack = []

        steps = 0
        num_ops = len(ops)
        mem_tmp = mem.copy()
        mem_seq = {}

        # start program execution
        pc = 0
        while pc < num_ops:
            op = ops[pc]
            pc_next = pc + 1

            if op.type == Operation.Type.LPB:
                if len(loop_stack) >= self.__max_loop_stack_size and self.__max_loop_stack_size >= 0:
                    self.__raise(
                        "maximum stack size exceeded: {}".format(len(loop_stack)))
                loop_stack.append(pc)
                mem_stack.append(mem_tmp.copy())
                counter = self.__get(op.target, mem_tmp, False)
                counter_stack.append(counter)

            elif op.type == Operation.Type.LPE:
                lpb = ops[loop_stack[-1]]
                counter = self.__get(lpb.target, mem_tmp, False)
                if counter >= 0 and counter < counter_stack[-1]:
                    pc_next = loop_stack[-1] + 1  # jump back to begin
                    mem_stack[-1] = mem_tmp.copy()
                    counter_stack[-1] = counter
                else:
                    mem_tmp = mem_stack.pop()
                    loop_stack.pop()
                    counter_stack.pop()

            elif op.type == Operation.Type.SEQ:
                argument = self.__get(op.target, mem_tmp)
                seq_id = self.__get(op.source, mem_tmp)
                key = (seq_id, argument)
                if key in self.__terms_cache:
                    seq_result, seq_steps = self.__terms_cache[key]
                else:
                    mem_seq.clear()
                    mem_seq[0] = argument
                    seq_steps = self.run(seq_id, mem_seq)
                    seq_result = mem_seq.get(0, 0)
                    if self.__max_terms_cache_size < 0 or len(self.__terms_cache) <= self.__max_terms_cache_size:
                        self.__terms_cache[key] = (seq_result, seq_steps)
                self.__set(op.target, seq_result, mem_tmp, op)
                steps += seq_steps

            else:
                # arithmetic operation
                target = self.__get(op.target, mem_tmp)
                source = self.__get(op.source, mem_tmp)
                self.__set(op.target, exec_arithmetic(
                    op.type, target, source), mem_tmp, op)

            pc = pc_next

            # count execution steps
            steps += 1

            # check resource constraints
            if steps > self.__max_steps and self.__max_steps >= 0:
                self.__raise("exceeded maximum number of steps ({}); last operation: {}".format(
                    self.__max_steps, op))
            if len(mem_tmp) > self.__max_memory and self.__max_memory >= 0:
                self.__raise(
                    "exceeded maximum memory: {}; last operation: {} ".format(len(mem_tmp), op))

        # sanity check
        if len(loop_stack) + len(counter_stack) + len(mem_stack) > 0:
            self.__raise("execution error")

        # update main memory and return steps
        mem.clear()
        mem.update(mem_tmp)
        return steps

    def __get(self, op: Operand, mem, get_address=False):
        if op.type == Operand.Type.CONSTANT:
            if get_address:
                self.__raise("cannot get address of a constant")
            return op.value
        elif op.type == Operand.Type.DIRECT:
            return op.value if get_address else mem.get(op.value, 0)
        elif op.type == Operand.Type.INDIRECT:
            return mem.get(op.value, 0) if get_address else mem.get(mem.get(op.value, 0), 0)

    def __set(self, op: Operand, v, mem, last):
        index = op.value
        if op.type == Operand.Type.CONSTANT:
            self.__raise("cannot set value of a constant")
        elif op.type == Operand.Type.INDIRECT:
            index = mem.get(index, 0)
        if index > self.__max_memory and self.__max_memory >= 0:
            self.__raise(
                "maximum memory exceeded: {}; last operation: {}".format(index, last))
        if v == None:
            self.__raise(
                "overflow in {}; last operation: {}".format(op, last))
        mem[index] = v

    def __raise(self, msg: str) -> None:
        raise ValueError(msg)

Methods

def clear_cache(self)

Clear the program and the terms cache. This is useful to free accumulated memory.

Expand source code
def clear_cache(self):
    """Clear the program and the terms cache. This is useful to free accumulated memory."""
    if self.__program_cache:
        self.__program_cache.clear()
    self.__terms_cache.clear()
def eval_to_seq(self, program: Program, num_terms: int, use_steps=False)

Evaluate a program to an integer sequence.

Args

program
The program to be evaluated.
num_terms
The number of sequence terms to be computed.
use_steps
Flag indicating whether to return the number of steps as sequence.

Return

This function returns a pair. The first entry is the computed integer sequence as a list of ints. The second entry is the number of used computation steps.

Expand source code
def eval_to_seq(self, program: Program, num_terms: int, use_steps=False):
    """
    Evaluate a program to an integer sequence.

    Args:
        program: The program to be evaluated.
        num_terms: The number of sequence terms to be computed.
        use_steps: Flag indicating whether to return the number of steps as sequence.
    Return:
        This function returns a pair. The first entry is the computed integer sequence
        as a list of ints. The second entry is the number of used computation steps.
    """
    seq = []
    mem = {}
    total_steps = 0
    for i in range(num_terms):
        mem.clear()
        mem[0] = i
        steps = self.run(program, mem)
        seq.append(steps if use_steps else mem[0])
        total_steps += steps
    return seq, total_steps
def run(self, id_or_program, memory: dict)

Run a program.

Expand source code
def run(self, id_or_program, memory: dict):
    """Run a program."""
    if isinstance(id_or_program, Program):
        return self.__run(id_or_program, memory)
    elif isinstance(id_or_program, int):
        id = id_or_program
        if id in self.__running_programs:
            self.__raise("recursion detected")
        if self.__program_cache is None:
            self.__raise("program cache not set")
        program = self.__program_cache.get(id)
        if program is None:
            self.__raise("program not found: {}".format(id))
        self.__running_programs.add(id)
        steps = 0
        try:
            steps = self.__run(program, memory)
        finally:
            self.__running_programs.remove(id)
        return steps
    else:
        self.__raise(
            "expected ID or program: {}".format(id_or_program))