Article
· Dec 14, 2024 7m read

A Semantic Code Search Solution for TrakCare Using IRIS Vector Search

This article presents a potential solution for semantic code search in TrakCare using IRIS Vector Search.

Here's a brief overview of results from the TrakCare Semantic code search for the query: "Validation before database object save".

 

  • Code Embedding model 

There are numerous embedding models designed for sentences and paragraphs, but they are not ideal for code specific embeddings.

Three code-specific embedding models were evaluated: voyage-code-2, CodeBERT, GraphCodeBERT.  While none of these models were pre-trained for the ObjectScripts language, they still outperformed general-purpose embedding models in this context.

The CodeBERT was chosen as the embedding model for this solution. offering reliable performance without the need for an API key. 😁

class GraphCodeBERTEmbeddingModel:
    def __init__(self, model_name="microsoft/codebert-base"):
        self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
        self.model = RobertaModel.from_pretrained(model_name)
    
    def get_embedding(self, text):
        """
        Generate a CodeBERT embedding for the given text.
        """
        inputs = self.tokenizer(text, return_tensors="pt", max_length=512, truncation=True, padding="max_length")
        with torch.no_grad():
            outputs = self.model(**inputs)
        # Use the [CLS] token embedding for the representation
        cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
        return cls_embedding

 

  • IRIS Vector database

A table is defined with a VECTOR-typed column to store the embeddings. Please note that COLUMNAR index is not supported for VECTOR-typed column. 

CodeBERT embeddings have 768 dimensions. It can process texts of the maximal length of 512 tokens.

CREATE TABLE TrakCareCodeVector (
                file VARCHAR(150),
                codes VARCHAR(2000),
                codes_vector VECTOR(DOUBLE,768)
            ) 

  

  • Python DB-API  

The Python DB-API is used to establish a connection with IRIS instance to execute the SQL statements.

  1. Build the vector database for Trakcare source code.
  2. Retrieve the top_K highest DOT_PRODUCT code embeddings from IRIS vector database.
# build IRIS vector database
import iris
import os
from dotenv import load_dotenv

load_dotenv()

class IrisConn:
    """Connection with IRIS instance to execute the SQL statements """
    def __init__(self) -> None:
        connection_string = os.getenv("CONNECTION_STRING")
        username = os.getenv("IRISUSERNAME")
        password = os.getenv("PASSWORD")

        self.connection = iris.connect(
            connectionstr=connection_string,
            username=username,
            password=password,
            timeout=10000,
        )
        self.cursor = self.connection.cursor()

    def insert(self, params: list):
        try:
            sql = "INSERT INTO TrakCareCodeVector (file, codes, codes_vector) VALUES (?, ?, TO_VECTOR(?,double))"
            self.cursor.execute(sql,params)
        except Exception as ex:
            print(ex)
    
    def fetch_query(self, query: str):
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def close_db(self):
        self.cursor.close()
        self.connection.close()
        
from transformers import AutoTokenizer, AutoModel, RobertaTokenizer, RobertaModel, logging
import torch
import numpy as np
import os
from db import IrisConn
from GraphcodebertEmbeddings import MethodEmbeddingGenerator
from IRISClassParser import parse_directory
import sys, getopt

class GraphCodeBERTEmbeddingModel:
    def __init__(self, model_name="microsoft/codebert-base"):
        self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
        self.model = RobertaModel.from_pretrained(model_name)
    
    def get_embedding(self, text):
        """
        Generate a CodeBERT embedding for the given text.
        """
        inputs = self.tokenizer(text, return_tensors="pt", max_length=512, truncation=True, padding="max_length")
        with torch.no_grad():
            outputs = self.model(**inputs)
        # Use the [CLS] token embedding for the representation
        cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze().numpy()
        return cls_embedding
        
class IrisVectorDB:
    def __init__(self, vector_dim):
        """
        Initialize the IRIS vector database.
        """
        self.conn = IrisConn()
        self.vector_dim = vector_dim

    def insert(self, description: str, codes: str, vector):
        params=[description, codes, f'{vector.tolist()}']
        self.conn.insert(params)

    def search(self, query_vector, top_k=5):
        query_vectorStr = query_vector.tolist()
        query = f"SELECT TOP {top_k} file,codes FROM TrakCareCodeVector ORDER BY VECTOR_COSINE(codes_vector, TO_VECTOR('{query_vectorStr}',double)) DESC"
        results = self.conn.fetch_query(query)
        return results
    
# Chatbot for code retrieval
class CodeRetrieveChatbot:
    def __init__(self, embedding_model, vector_db):
        self.embedding_model = embedding_model
        self.vector_db = vector_db
    
    def add_to_database(self, description, code_snippet, embedding = None):
        if embedding is None:
            embedding = self.embedding_model.get_embedding(code_snippet)
        self.vector_db.insert(description, code_snippet, embedding)
    
    def retrieve_code(self, query, top_k=5):
        """
        Retrieve the most relevant code snippets for the given query.
        """
        query_embedding = self.embedding_model.get_embedding(query)
        results = self.vector_db.search(query_embedding, top_k)
        return results
  • Code Chunks  

Since CodeBERT can process texts with the maximal length of 512 tokens. Large classes and methods have to be chunked into smaller parts. Each chunk is then embedded and stored in the vector database.

from transformers import AutoTokenizer, AutoModel, RobertaTokenizer, RobertaModel
import torch
from IRISClassParser import parse_directory

class MethodEmbeddingGenerator:
    def __init__(self, model_name="microsoft/codebert-base"):
        """
        Initialize the embedding generator with CodeBERT.

        :param model_name: The name of the pretrained CodeBERT model.
        """
        self.tokenizer = RobertaTokenizer.from_pretrained(model_name)
        self.model = RobertaModel.from_pretrained(model_name)
        self.max_tokens = self.tokenizer.model_max_length  # Typically 512 for CodeBERT
    def chunk_method(self, method_implementation):
        """
        Split method implementation into chunks based on lines of code that approximate the token limit.

        :param method_implementation: The method implementation as a string.
        :return: A list of chunks.
        """
        lines = method_implementation.splitlines()
        chunks = []
        current_chunk = []
        current_length = 0
        for line in lines:
            # Estimate tokens of the line
            line_token_estimate = len(self.tokenizer.tokenize(line))
            if current_length + line_token_estimate <= self.max_tokens - 2:
                current_chunk.append(line)
                current_length += line_token_estimate
            else:
                # Add the current chunk to chunks and reset
                chunks.append("\n".join(current_chunk))
                current_chunk = [line]
                current_length = line_token_estimate

        # Add the last chunk if it has content
        if current_chunk:
            chunks.append("\n".join(current_chunk))

        return chunks

    def get_embeddings(self, method_implementation):
        """
        Generate embeddings for a method implementation, handling large methods by chunking.

        :param method_implementation: The method implementation as a string.
        :return: A list of embeddings (one for each chunk).
        """
        chunks = self.chunk_method(method_implementation)
        embeddings = {}

        for chunk in chunks:
            inputs = self.tokenizer(chunk, return_tensors="pt", truncation=True, padding=True, max_length=self.max_tokens)
            with torch.no_grad():
                outputs = self.model(**inputs)
                # Use the [CLS] token embedding (index 0) as the representation
                cls_embedding = outputs.last_hidden_state[:, 0, :].squeeze(0)
                embeddings[chunk] = cls_embedding.numpy()

        return embeddings

    def process_methods(self, methods):
        """
        Process a list of methods to generate embeddings for each.

        :param methods: A list of dictionaries with method names and implementations.
        :return: A dictionary with method names as keys and embeddings as values.
        """
        method_embeddings = {}
        for method in methods:
            method_name = method["name"]
            implementation = method["implementation"]
            print(f"Processing method embedding: {method_name}")
            method_embeddings[method_name] = self.get_embeddings(implementation)
        return method_embeddings
  • UI - The Angular APP  

The stack uses Angular as the frontend and Python (Flask) as the backend.

 

  • Future Directions

The searching result is not perfec because the embedding model is not pre-trained for objectscripts. 

Discussion (0)1
Log in or sign up to continue