Module loda.ml.util
Expand source code
from loda.lang import Operand, Operation, Program
from loda.oeis import ProgramCache
import random
def program_to_tokens(program: Program):
"""
Convert a program to tokens and vocabulary.
Every operation is represented using three tokens:
1. operation type
2. target operand
3. source operand
Args:
program: Program to be converted to tokens.
Return:
Pair of token list and vocabulary list.
## Example
>>>
>>> program = Program("mov $1,5\\ndiv $1,$3")
>>> program_to_tokens(program)
(['mov', '$1', '5', 'div', '$1', '$3'], ['$1', '$3', '5', 'div', 'mov'])
"""
tokens = []
vocab = set()
for op in program.operations:
type = op.type.name.lower()
target = str(op.target)
source = str(op.source)
tokens.append(type)
tokens.append(target)
tokens.append(source)
vocab.add(type)
vocab.add(target)
vocab.add(source)
return tokens, sorted(vocab)
def tokens_to_operation(tokens: list, start: int):
try:
type = Operation.Type[tokens[start].upper()]
target = Operand(tokens[start+1])
source = Operand(tokens[start+2])
return Operation(type, target, source)
except Exception as e:
return None
def tokens_to_program(tokens: list) -> Program:
i = 0
program = Program()
while i+2 < len(tokens):
operation = tokens_to_operation(tokens, i)
program.operations.append(operation)
i += 3
return program
def append_nops(program: Program, num_nops: int):
for _ in range(num_nops):
program.operations.append(Operation()) # nop
def get_random_program_ids(program_cache: ProgramCache, num_programs: int = -1):
# Get IDs of all existing programs. Shuffle them and reduce
# the number of program IDs if requested.
ids = program_cache.all_ids()
random.shuffle(ids)
if num_programs >= 0 and len(ids) > num_programs:
ids = ids[0:num_programs]
return ids
def merge_programs(program_cache: ProgramCache, program_ids: list,
num_ops_per_sample: int, num_nops_separator: int):
# Merge all programs into one program. Invidual programs are
# separated by (multiple) nops. The number nops equals the
# number of operations per sample.
merged = Program()
num_loaded = 0
for id in program_ids:
program = program_cache.get(id)
append_nops(merged, num_nops_separator)
for op in program.operations:
if op.type != Operation.Type.NOP:
merged.operations.append(op)
num_loaded += 1
if num_loaded % 1000 == 0:
program_cache.clear()
# TODO: use logger
print(num_loaded)
program_cache.clear()
# Calculate the sample size (includes input and label).
# One operation is encoded using three tokens plus one
# token because we split into input and label.
sample_size = (3 * num_ops_per_sample) + 1
# Extend the training dataset with nops such that samples are
# get shifted by one when repeating the data set. We later want
# to repeat the dataset such that we cover all possible start
# positions. This code works because the sample size is 1 mod 3.
shift = (3 * len(merged.operations)) % sample_size
while shift != 1 and shift != (sample_size-1):
append_nops(merged, 1)
shift = (3 * len(merged.operations)) % sample_size
num_samples = 3 * len(merged.operations)
return merged, num_samples, sample_size
def split_sample(sample: list):
# Split sample into (input,label) pair
return sample[:-1], sample[1:]
Functions
def append_nops(program: Program, num_nops: int)
-
Expand source code
def append_nops(program: Program, num_nops: int): for _ in range(num_nops): program.operations.append(Operation()) # nop
def get_random_program_ids(program_cache: ProgramCache, num_programs: int = -1)
-
Expand source code
def get_random_program_ids(program_cache: ProgramCache, num_programs: int = -1): # Get IDs of all existing programs. Shuffle them and reduce # the number of program IDs if requested. ids = program_cache.all_ids() random.shuffle(ids) if num_programs >= 0 and len(ids) > num_programs: ids = ids[0:num_programs] return ids
def merge_programs(program_cache: ProgramCache, program_ids: list, num_ops_per_sample: int, num_nops_separator: int)
-
Expand source code
def merge_programs(program_cache: ProgramCache, program_ids: list, num_ops_per_sample: int, num_nops_separator: int): # Merge all programs into one program. Invidual programs are # separated by (multiple) nops. The number nops equals the # number of operations per sample. merged = Program() num_loaded = 0 for id in program_ids: program = program_cache.get(id) append_nops(merged, num_nops_separator) for op in program.operations: if op.type != Operation.Type.NOP: merged.operations.append(op) num_loaded += 1 if num_loaded % 1000 == 0: program_cache.clear() # TODO: use logger print(num_loaded) program_cache.clear() # Calculate the sample size (includes input and label). # One operation is encoded using three tokens plus one # token because we split into input and label. sample_size = (3 * num_ops_per_sample) + 1 # Extend the training dataset with nops such that samples are # get shifted by one when repeating the data set. We later want # to repeat the dataset such that we cover all possible start # positions. This code works because the sample size is 1 mod 3. shift = (3 * len(merged.operations)) % sample_size while shift != 1 and shift != (sample_size-1): append_nops(merged, 1) shift = (3 * len(merged.operations)) % sample_size num_samples = 3 * len(merged.operations) return merged, num_samples, sample_size
def program_to_tokens(program: Program)
-
Convert a program to tokens and vocabulary.
Every operation is represented using three tokens:
- operation type
- target operand
- source operand
Args
program
- Program to be converted to tokens.
Return
Pair of token list and vocabulary list.
Example
>>> program = Program("mov $1,5\ndiv $1,$3") >>> program_to_tokens(program) (['mov', '$1', '5', 'div', '$1', '$3'], ['$1', '$3', '5', 'div', 'mov'])
Expand source code
def program_to_tokens(program: Program): """ Convert a program to tokens and vocabulary. Every operation is represented using three tokens: 1. operation type 2. target operand 3. source operand Args: program: Program to be converted to tokens. Return: Pair of token list and vocabulary list. ## Example >>> >>> program = Program("mov $1,5\\ndiv $1,$3") >>> program_to_tokens(program) (['mov', '$1', '5', 'div', '$1', '$3'], ['$1', '$3', '5', 'div', 'mov']) """ tokens = [] vocab = set() for op in program.operations: type = op.type.name.lower() target = str(op.target) source = str(op.source) tokens.append(type) tokens.append(target) tokens.append(source) vocab.add(type) vocab.add(target) vocab.add(source) return tokens, sorted(vocab)
def split_sample(sample: list)
-
Expand source code
def split_sample(sample: list): # Split sample into (input,label) pair return sample[:-1], sample[1:]
def tokens_to_operation(tokens: list, start: int)
-
Expand source code
def tokens_to_operation(tokens: list, start: int): try: type = Operation.Type[tokens[start].upper()] target = Operand(tokens[start+1]) source = Operand(tokens[start+2]) return Operation(type, target, source) except Exception as e: return None
def tokens_to_program(tokens: list) ‑> Program
-
Expand source code
def tokens_to_program(tokens: list) -> Program: i = 0 program = Program() while i+2 < len(tokens): operation = tokens_to_operation(tokens, i) program.operations.append(operation) i += 3 return program