Semantic Kernel Planner 101

Introduction

If you are a developer who wants to build AI-first apps with natural language processing and large language models, you might be interested in Semantic Kernel (SK), a lightweight and open-source SDK that aims to simplify the integration of AI with conventional programming languages.

SK is part of the CoPilot Stack and Microsoft is using it in its own CoPilots.

SK allows you to create and orchestrate semantic functions, native functions, memories, and connectors using C# or Python. Much like LangChain, it supports prompt templating, chaining, and memory with vectors (embeddings).

You can also use SK’s planner to automatically generate and execute complex tasks based on a user’s goals. This is similar to LangChain’s Agents & Tools capabilities. In this blog post, we will introduce some of the features and benefits of SK’s Planner, and show you how to use it in your own applications. I am still learning so I am going to stick to the basics! 😃

Source: Unlock the Potential of AI in Your Apps with Semantic Kernel: A Lightweight SDK for Large Language Models Integration (microsoft.com)

SK’s Planner allows you to create and execute plans based on semantic queries. You start by providing it a goal (an ask). The goal could be: “Create a photo of a meal with these ingredients: {list of ingredients}”. To achieve the goal, the planner can use plugins to generate and execute the plan. For the goal above, suppose we have two plugins:

  • Recipe plugin: creates a recipe based on starter ingredients
  • Image description plugin: creates an image description based on any input

The recipe plugin takes a list of ingredients as input while the image description plugin can take the recipe as input and generate an image description of it. That image description could be used by DALL-E to generate an actual image.

Note: at the time of writing, Microsoft was on the verge of using the word plugin instead of skill. In the code, you will see references to skills but that will go away. This post already the word plugins instead of skills.

Creating plugins

Plugins make expertise available to SK and consist of one or more functions. A function can be either:

  • an LLM prompt: a semantic function
  • native computer code: a native function

A plugin is a container where functions live. Think of it as a folder with each subfolder containing a function. For example:

Badly named 😃 MySkills plugin with two semantic functions

A semantic function is a prompt with placeholders for one or more input variables. The prompt is in skprompt.txt. The Recipe function uses the following prompt:

Write a recipe with the starter ingredients below and be specific about the steps to take and the amount of ingredients to use: 

{{$input}}

The file config.json contains metadata about the function and LLM completion settings such as max_tokens and temperature. For example:

{
    "schema": 1,
    "type": "completion",
    "description": "Creates a recipe from starting ingredients",
    "completion": {
        "max_tokens": 256,
        "temperature": 0,
        "top_p": 0,
        "presence_penalty": 0,
        "frequency_penalty": 0
    },
    "input": {
        "parameters": [
            {
                "name": "input",
                "description": "Input for this semantic function.",
                "defaultValue": ""
            }
        ]
    },
    "default_backends": []
}

From your code, you can simply run this function to create a recipe. The plugin above is similar to a PromptTemplate in LangChain that you can combine with an LLM into a chain. You would then simply run the chain to get the output (a recipe). SK supports creating functions inline in your code as well, similar to how LangChain works.

Using the Planner

As stated above, the Planner can use plugins to reach the goal provided by a user’s ask. It actually works its way backward from the goal to create the plan:

Source: https://learn.microsoft.com/en-us/semantic-kernel/create-chains/planner

There are different types of planners like a sequential planner, an action planner, a custom planner, and more. In our example, we will use a sequential planner and keep things as simple as possible. We will only use semantic functions, no native code functions.

Time for some code. We will build a small .NET Console App based on the example above: create a recipe and generate a photo description for this recipe. Here is the code:


using Microsoft.Extensions.Logging;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.AI.ImageGeneration;
using System.Diagnostics;
using Microsoft.SemanticKernel.Planning;
using System.Text.Json;

var kernelSettings = KernelSettings.LoadSettings();

var kernelConfig = new KernelConfig();
kernelConfig.AddCompletionBackend(kernelSettings);

using ILoggerFactory loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .SetMinimumLevel(kernelSettings.LogLevel ?? LogLevel.Warning)
        .AddConsole()
        .AddDebug();
});

IKernel kernel = new KernelBuilder()
    .WithLogger(loggerFactory.CreateLogger<IKernel>())
    .WithConfiguration(kernelConfig)
    .Configure(c =>
    {
        c.AddOpenAIImageGenerationService(kernelSettings.ApiKey);

    })
    .Build();

// used later to generate image with dallE
var dallE = kernel.GetService<IImageGeneration>();

// use SKs planner
var planner = new SequentialPlanner(kernel);

// depends on MySkills skill which has two semantic fucntions
// skills will be renamed to plugins in the future
var skillsDirectory = Path.Combine(System.IO.Directory.GetCurrentDirectory(), "skills");
var skill = kernel.ImportSemanticSkillFromDirectory(skillsDirectory, "MySkills");

// ask for starter ingredients
Console.Write("Enter ingredients: ");
string? input = Console.ReadLine();


if (!string.IsNullOrEmpty(input))
{

    // define the ASK for the planner; the two semantic functions should be used by the plan
    // note that these functions are too simple to be useful in a real application
    // a single prompt to the model would be enough
    var ask = "Create a photo of a meal with these ingredients:" + input;

    // create the plan and print it to see if the functions are used correctly
    var newPlan = await planner.CreatePlanAsync(ask);

    Console.WriteLine("Updated plan:\n");
    Console.WriteLine(JsonSerializer.Serialize(newPlan, new JsonSerializerOptions { WriteIndented = true }));

    // run the plan; result should be an image description
    var newPlanResult = await newPlan.InvokeAsync();


    // generate the url to the images created by dalle
    Console.WriteLine("Plan result: " + newPlanResult.ToString());
    var imageURL = await dallE.GenerateImageAsync(newPlanResult.ToString(), 512, 512);

    // display image in browser (MacOS!!!)
    Process.Start("open", imageURL);

The code uses config/appsettings.json which contains settings like serviceType, serviceId, and the API key to use with either OpenAI or Azure OpenAI. In my case, serviceType is OpenAI and the serviceId is gpt-4. Ensure you have gpt-4 access in OpenAI’s API. I actually wanted to use Azure OpenAI but I do not have access to DALL-E and I do not think SK would support it anyway.

After loading the settings, a KernelConfig is created, and a completion backend gets added (using gpt-4 here). After setting up logging, a new kernel is created with a new KernelBuilder() with the following settings;

  • a logger
  • the configuration with the completion backend
  • an image generation service (DALL-E2 here) which needs the OpenAI API key (retrieved from kernelSettings)

We can now create the planner with var planner = new SequentialPlanner(kernel); and add skills (plugins) to the kernel. We add plugins from the skills/MySkills folder in our project.

Now it’s just a matter of asking the user for some ingredients (stored in input) and creating the plan based on the ask. The ask is “Create a photo of a meal with these ingredients: …”

var ask = "Create a photo of a meal with these ingredients:" + input;

var newPlan = await planner.CreatePlanAsync(ask);

Note that CreatePlanAsync does not execute the plan, it just creates it. We can look at the plan with the following code:

Console.WriteLine("Updated plan:\n");
    Console.WriteLine(JsonSerializer.Serialize(newPlan, new JsonSerializerOptions { WriteIndented = true }));

The output is something like this (note that there are typos in the ingredients but that’s ok, the model should understand):

{
  "state": [
    {
      "Key": "INPUT",
      "Value": ""
    }
  ],
  "steps": [
    {
      "state": [
        {
          "Key": "INPUT",
          "Value": ""
        }
      ],
      "steps": [],
      "parameters": [
        {
          "Key": "INPUT",
          "Value": "courgette, collieflower, steak, tomato"
        }
      ],
      "outputs": [
        "RECIPE_RESULT"
      ],
      "next_step_index": 0,
      "name": "Recipe",
      "skill_name": "MySkills",
      "description": "Creates a recipe from starting ingredients"
    },
    {
      "state": [
        {
          "Key": "INPUT",
          "Value": ""
        }
      ],
      "steps": [],
      "parameters": [
        {
          "Key": "INPUT",
          "Value": "$RECIPE_RESULT"
        }
      ],
      "outputs": [
        "RESULT__IMAGE_DESCRIPTION"
      ],
      "next_step_index": 0,
      "name": "ImageDesc",
      "skill_name": "MySkills",
      "description": "Generate image description for a photo of a recipe or meal"
    }
  ],
  "parameters": [
    {
      "Key": "INPUT",
      "Value": ""
    }
  ],
  "outputs": [
    "RESULT__IMAGE_DESCRIPTION"
  ],
  "next_step_index": 0,
  "name": "",
  "skill_name": "Microsoft.SemanticKernel.Planning.Plan",
  "description": "Create a photo of a meal with these ingredients:courgette, collieflower, steak, tomato"
}

The output shows that the two skills will be used in this case. The input to the ImageDesc plugin is the output from the Recipe plugin.

Note that if your ask has nothing to do with generating dishes and photos of a dish, the skills will still be used resulting in unexpected results.

If you do not provide any skills, the planner will just use itself as a skill and the result will be the original ask. That would still work in this case because the ask can be passed to Dall-E on its own!

With the plan created, we can now execute it and show the result:

var newPlanResult = await newPlan.InvokeAsync();
Console.WriteLine("Plan result: " + newPlanResult.ToString());

This should print the description of the photo. We can pass that result to DALL-E with:

var imageURL = await dallE.GenerateImageAsync(newPlanResult.ToString(), 512, 512);
Process.Start("open", imageURL);   // works on MacOS

If I provide carrots and meat, then I get the following description and photo.

Description: A steaming pot of hearty carrot and meat stew is pictured. The pot is filled with chunks of lean ground beef, diced carrots, diced onion, minced garlic, and a rich tomato paste. Aromatic herbs of oregano and thyme are sprinkled on top, and the stew is finished with a drizzle of olive oil. The stew is ready to be served and is sure to be a delicious and comforting meal.

The photo:

Testing skills and plans

The Semantic Kernel extension for VS Code will find your plugins (skills) and allow you to execute them:

Recipe skill in VS Code

When you click the play icon next to the skill, you will be asked for input. The prompt will then be run by your selected model, with output to the screen:

You can also create plans and execute them in VS Code:

Plan in VS Code

Above, a plan was created based on a goal. The plan included the two plugins and shows the inputs and outputs. By clicking on Execute Plan you can run it without having to write any code. The UI above allows you to inspect the generated plan and change it if it’s not performing as intended.

Conclusion

This concludes my quick look at the Planner functionality in Semantic Kernel with a simple plan and a couple of semantic skills. If you want to learn more, be sure to check these resources:

Enhancing Semantic Search with a Streamlit UI

In a previous blog post, we discussed two Python programs, upload_vectors.py and search_vectors.py. These programs were used to create and search vectors, respectively. The upload_vectors.py script created vectors from chunks of a larger text and stored them in Pinecone, while the search_vectors.py script enabled semantic search on the text. In this blog post, we will discuss how to create a user interface (UI) for these two programs using Streamlit.

🚀 I kickstarted the Streamlit app by handing over the text-based version to ChatGPT and asking it to work its magic ✨💻. Yes, it was that easy! Afterwards, I made several manual changes to make it look the way I wanted.

Pinecone, Vectors, Embeddings, and Semantic Search: What’s all that about?

Pinecone is a vector database service that allows for easy storage and retrieval of high-dimensional vectors. It is optimized for similarity search, which makes it a perfect fit for tasks like semantic search. Our script stores vectors in Pinecone by parsing an RSS feed, chunking the blog posts, and creating the vectors with OpenAI’s embedding APIs.

Vectors are mathematical representations of data in the form of an array of numbers. In our case, we use vectors to represent chunks of text retrieved from blog posts. These vectors are generated using a process called embedding, which is a way of representing complex data, like text, in a lower-dimensional space while preserving the essential information.

Semantic search is a type of search that goes beyond keyword matching to understand the meaning and context of the query. By using vector embeddings, we can compare the similarity between queries and stored texts to find the most relevant results. Pinecone does that search for us and simply returns a number of matching chunks (pieces of text).

What is Streamlit?

Streamlit is a Python library that makes it easy to create custom web apps for machine learning and data science projects. You can build interactive UIs with minimal code, allowing you to focus on the core logic of your application.

Here’s an example of creating an extremely simple Streamlit app:

import streamlit as st

st.title('Hello, Streamlit!')
st.write('This is a simple Streamlit app.')

This code would generate a web app with a title and a text output. You can also create more complex UIs with user input, like sliders, text inputs, and buttons.

Creating a Streamlit UI for Semantic Search

Now let’s examine the provided code for creating a Streamlit UI for the search_vectors.py program. The code can be broken down into the following sections:

  1. Import necessary libraries and check environment variables.
  2. Set up the tokenizer and define the tiktoken_len function.
  3. Create the UI elements, including the title, text input, dropdown, sliders, and buttons.
  4. Define the main search functionality that is triggered when the user clicks the “Search” button.

Here is the full code:

import os
import pinecone
import openai
import tiktoken
import streamlit as st

# check environment variables
if os.getenv('PINECONE_API_KEY') is None:
    st.error("PINECONE_API_KEY not set. Please set this environment variable and restart the app.")
if os.getenv('PINECONE_ENVIRONMENT') is None:
    st.error("PINECONE_ENVIRONMENT not set. Please set this environment variable and restart the app.")
if os.getenv('OPENAI_API_KEY') is None:
    st.error("OPENAI_API_KEY not set. Please set this environment variable and restart the app.")

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')


def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

# create a title for the app
st.title("Search blog feed 🔎")

# create a text input for the user query
your_query = st.text_input("What would you like to know?")
model = st.selectbox("Model", ["gpt-3.5-turbo", "gpt-4"])

with st.expander("Options"):

    max_chunks = 5
    if model == "gpt-4":
        max_chunks = 15

    max_reply_tokens = 1250
    if model == "gpt-4":
        max_reply_tokens = 2000

    col1, col2 = st.columns(2)

    # model dropdown
    with col1:
        chunks = st.slider("Number of chunks", 1, max_chunks, 5)
        temperature = st.slider("Temperature", 0.0, 1.0, 0.0)

    with col2:
        reply_tokens = st.slider("Reply tokens", 750, max_reply_tokens, 750)
    

# create a submit button
if st.button("Search"):
    # get the Pinecone API key and environment
    pinecone_api = os.getenv('PINECONE_API_KEY')
    pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

    pinecone.init(api_key=pinecone_api, environment=pinecone_env)

    # set index
    index = pinecone.Index('blog-index')


    # vectorize your query with openai
    try:
        query_vector = openai.Embedding.create(
            input=your_query,
            model="text-embedding-ada-002"
        )["data"][0]["embedding"]
    except Exception as e:
        st.error(f"Error calling OpenAI Embedding API: {e}")
        st.stop()

    # search for the most similar vector in Pinecone
    search_response = index.query(
        top_k=chunks,
        vector=query_vector,
        include_metadata=True)

    # create a list of urls from search_response['matches']['metadata']['url']
    urls = [item["metadata"]['url'] for item in search_response['matches']]

    # make urls unique
    urls = list(set(urls))

    # create a list of texts from search_response['matches']['metadata']['text']
    chunk_texts = [item["metadata"]['text'] for item in search_response['matches']]

    # combine texts into one string to insert in prompt
    all_chunks = "\n".join(chunk_texts)

    # show urls of the chunks
    with st.expander("URLs", expanded=True):
        for url in urls:
            st.markdown(f"* {url}")
    

    with st.expander("Chunks"):
        for i, t in enumerate(chunk_texts):
            # remove newlines from chunk
            tokens = tiktoken_len(t)
            t = t.replace("\n", " ")
            st.write("Chunk ", i, "(Tokens: ", tokens, ") - ", t[:50] + "...")
    with st.spinner("Summarizing..."):
        try:
            prompt = f"""Answer the following query based on the context below ---: {your_query}
                                                        Do not answer beyond this context!
                                                        ---
                                                        {all_chunks}"""


            # openai chatgpt with article as context
            # chat api is cheaper than gpt: 0.002 / 1000 tokens
            response = openai.ChatCompletion.create(
                model=model,
                messages=[
                    { "role": "system", "content":  "You are a truthful assistant!" },
                    { "role": "user", "content": prompt }
                ],
                temperature=temperature,
                max_tokens=max_reply_tokens
            )

            st.markdown("### Answer:")
            st.write(response.choices[0]['message']['content'])

            with st.expander("More information"):
                st.write("Query: ", your_query)
                st.write("Full Response: ", response)

            with st.expander("Full Prompt"):
                st.write(prompt)

            st.balloons()
        except Exception as e:
            st.error(f"Error with OpenAI Completion: {e}")

A closer look

The code first imports the necessary libraries and checks if the required environment variables are set, displaying an error message if they are not. The libraries you need are in requirements.txt on GitHub. You can install them with:

pip3 install -r requirements.txt

ℹ️ I recommend using a Python virtual environment when you install these dependencies; see poetry (just one example)

The tiktoken_len function calculates the token length of a given text using the tokenizer. This is used to display the tokens of each chunk of text we set to the ChatCompletion API. Depending on the model, 4096 or 8192 tokens are supported.

The UI is built using Streamlit functions, such as st.title, st.text_input, st.selectbox, and st.columns. These functions create various UI elements that the user can interact with to input their query and set search parameters. If you look at the code, you will see how easy it is to add those elements.

With the UI elements, you can set:

  • the number of text chunks to return from Pinecone and to forward to the ChatCompletion API (using st.slider)
  • the number of tokens to reply with (using st.slider)
  • the model: gpt-3.5-turbo or gpt-4 (ensure you have access to the gpt-4 API)
  • the temperature (using st-slider)

The options are shown in two columns with st.columns.

The main search functionality is triggered when the user clicks the “Search” button. The code then vectorizes the query, searches for the most similar vectors in Pinecone, and displays the URLs and chunks found. Finally, the selected model is used to generate an answer based on the chunks found and the user’s query. Often, gpt-4 will provide the best answer. It seems to be able to better understand all the chunks of text thrown at it.

Running the code

To run the code you need the following:

  • A Pinecode API key and environment
  • An OpenAI API key

It is easiest to run the code with Docker. If you have it installed, run the following command:

docker run -p 8501:8501 -e OPENAI_API_KEY="YOURKEY" \
    -e PINECONE_API_KEY="YOURKEY" \
    -e PINECONE_ENVIRONMENT="YOURENV" gbaeke/blogsearch

The gbaeke/blogsearch image is available on Docker Hub. You can also build your own with the Dockerfile provided on GitHub.

After running the image, go to http://localhost:8501 and first use the Upload page to create your Pinecode index and store vectors in it. You can use my blog’s feed or any other feed. You can experiment with the chunk size and chunk overlap.

Upload to Pinecone

You can add multiple RSS feeds one-by-one as long as you turn off Recreate index before each new upload. After you have populated the index, use the Search page to start searching:

Searching

Above, we ask what we can do with Pinecone and let gpt-4 do the answering. The similarity search will search for 5 similar items and return them. We show the original URLs these results come from. In the Chunks section, you can see the original chunks because they are also in Pinecone as metadata. After the answer, you can find the full JSON returned by the ChatCompletion API and the full prompt we sent to that API.

Conclusion

In this blog post, we showed you how to create a Streamlit UI for the search_vectors.py script we talked about in a previous post. Streamlit allows you to easily build interactive web applications for your machine learning and data science projects. We also created a UI to upload posts to Pinecone. The full program allows you to add as much data as you want and query that data with semantic search, summarized and synthesized by the GPT model of choice. Give it a try and let me know what you think.

Enhancing Blog Post Search with Chunk-based Embeddings and Pinecone

In this blog post, we’ll show you a different approach to searching through a large database of blog posts. The previous approach involved creating a single embedding for the entire article and storing it in a vector database. The new approach is much more effective, and in this post, we’ll explain why and how to implement it.

The new approach involves the following steps:

  1. Chunk the article into pieces of about 400 tokens using LangChain
  2. Create an embedding for each chunk
  3. Store each embedding, along with its metadata such as the URL and the original text, in Pinecone
  4. Store the original text in Pinecone, but not indexed
  5. To search the blog posts, find the 5 best matching chunks and add them to the ChatCompletion prompt

We’ll explain each step in more detail below, but first, let’s start with a brief overview of the previous approach.

The previous approach used OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. The article was vectorized as a whole, and the resulting vector was stored in Pinecone. To search the blog posts, cosine similarity was used to find the closest matching article, and the contents of the article were retrieved using the Python requests library and the BeautifulSoup library. Finally, a prompt was created for the ChatCompletion API, including the retrieved article.

The problem with this approach was that the entire article was vectorized as one piece. This meant that if the article was long, the vector might not represent the article accurately, as it would be too general. Moreover, if the article was too long, the ChatCompletion API call might fail because too many tokens were used.

The new approach solves these problems by chunking the article into smaller pieces, creating an embedding for each chunk, and storing each embedding in Pinecone. This way, we have a much more accurate representation of the article, as each chunk represents a smaller, more specific part of the article. And because each chunk is smaller, there is less risk of using too many tokens in the ChatCompletion API call.

To implement the new approach, we’ll use LangChain to chunk the article into pieces of about 400 tokens. LangChain is a library aimed at assisting in the development of applications that use LLMs, or large language models.

Next, we’ll create an embedding for each chunk using OpenAI’s embeddings API. As before, we will use the text-embedding-ada-002 model. And once we have the embeddings, we’ll store each one, along with its metadata, in Pinecone. The key for each embedding will be a hash of the URL, combined with the chunk number.

The original text will also be stored in Pinecone, but not indexed, so that it can be retrieved later. With this approach, we do not need to retrieve a blog article from the web. Instead, we just get the text from Pinecone directly.

To search the blog posts, we’ll use cosine similarity to find the 5 best-matching chunks. The 5 best matching chunks will be added to the ChatCompletion prompt, allowing us to ask questions based on the article’s contents.

Uploading the embeddings

The code to upload the embeddings is shown below. You will need to set the following environment variables:

export OPENAI_API_KEY=your_openai_api_key
export PINECONE_API_KEY=your_pinecone_api_key
export PINECONE_ENVIRONMENT=your_pinecone_environment
import feedparser
import os
import pinecone
import openai
import requests
from bs4 import BeautifulSoup
from retrying import retry
from langchain.text_splitter import RecursiveCharacterTextSplitter
import tiktoken
import hashlib

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')

# create the length function used by the RecursiveCharacterTextSplitter
def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000)
def create_embedding(article):
    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    return embedding["data"][0]["embedding"]

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

if "blog-index" not in pinecone.list_indexes():
    print("Index does not exist. Creating...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})
else:
    print("Index already exists. Deleting...")
    pinecone.delete_index("blog-index")
    print("Creating new index...")
    pinecone.create_index("blog-index", 1536, metadata_config= {"indexed": ["url", "chunk-id"]})

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://blog.baeke.info/feed/'

# Parse the RSS feed with feedparser
print("Parsing RSS feed: ", url)
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

# create recursive text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=400,
    chunk_overlap=20,  # number of tokens overlap between chunks
    length_function=tiktoken_len,
    separators=['\n\n', '\n', ' ', '']
)

pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Create embeddings for entry ", i, " of ", entries, " (", entry.link, ")")

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # create chunks
    chunks = text_splitter.split_text(article)

    # create md5 hash of entry.link
    url = entry.link
    url_hash = hashlib.md5(url.encode("utf-8"))
    url_hash = url_hash.hexdigest()
        
    # create embeddings for each chunk
    for j, chunk in enumerate(chunks):
        print("\tCreating embedding for chunk ", j, " of ", len(chunks))
        vector = create_embedding(chunk)

        # concatenate hash and j
        hash_j = url_hash + str(j)

        # add vector to pinecone_vectors list
        print("\tAdding vector to pinecone_vectors list for chunk ", j, " of ", len(chunks))
        pinecone_vectors.append((hash_j, vector, {"url": entry.link, "chunk-id": j, "text": chunk}))

        # upsert every 100 vectors
        if len(pinecone_vectors) % 100 == 0:
            print("Upserting batch of 100 vectors...")
            upsert_response = index.upsert(vectors=pinecone_vectors)
            pinecone_vectors = []

# if there are any vectors left, upsert them
if len(pinecone_vectors) > 0:
    print("Upserting remaining vectors...")
    upsert_response = index.upsert(vectors=pinecone_vectors)
    pinecone_vectors = []

print("Vector upload complete.")

Searching for blog posts

The code below is used to search blog posts:

import os
import pinecone
import openai
import tiktoken

# use cl100k_base tokenizer for gpt-3.5-turbo and gpt-4
tokenizer = tiktoken.get_encoding('cl100k_base')


def tiktoken_len(text):
    tokens = tokenizer.encode(
        text,
        disallowed_special=()
    )
    return len(tokens)

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index
index = pinecone.Index('blog-index')

while True:
    # set query
    your_query = input("\nWhat would you like to know? ")
    
    # vectorize your query with openai
    try:
        query_vector = openai.Embedding.create(
            input=your_query,
            model="text-embedding-ada-002"
        )["data"][0]["embedding"]
    except Exception as e:
        print("Error calling OpenAI Embedding API: ", e)
        continue

    # search for the most similar vector in Pinecone
    search_response = index.query(
        top_k=5,
        vector=query_vector,
        include_metadata=True)

    # create a list of urls from search_response['matches']['metadata']['url']
    urls = [item["metadata"]['url'] for item in search_response['matches']]

    # make urls unique
    urls = list(set(urls))

    # create a list of texts from search_response['matches']['metadata']['text']
    chunks = [item["metadata"]['text'] for item in search_response['matches']]

    # combine texts into one string to insert in prompt
    all_chunks = "\n".join(chunks)

    # print urls of the chunks
    print("URLs:\n\n", urls)

    # print the text number and first 50 characters of each text
    print("\nChunks:\n")
    for i, t in enumerate(chunks):
        print(f"\nChunk {i}: {t[:50]}...")

    try:
        # openai chatgpt with article as context
        # chat api is cheaper than gpt: 0.002 / 1000 tokens
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                { "role": "system", "content":  "You are a thruthful assistant!" },
                { "role": "user", "content": f"""Answer the following query based on the context below ---: {your_query}
                                                    Do not answer beyond this context!
                                                    ---
                                                    {all_chunks}""" }
            ],
            temperature=0,
            max_tokens=750
        )

        print(f"\n{response.choices[0]['message']['content']}")
    except Exception as e:
        print(f"Error with OpenAI Completion: {e}")

In Action

Below, we ask if Redis supports storing vectors and what version of Redis we need in Azure. The Pinecone vector search found 5 chunks, all from the same blog post (there is only one URL). The five chunks are combined and sent to ChatGPT, together with the original question. The response from the ChatCompletion API is clear!

Example question and response

Conclusion

In conclusion, the “chunked” approach to searching through a database of blog posts is much more effective and solves many of the problems associated with the previous approach. We hope you found this post helpful, and we encourage you to try out the new approach in your own projects!

Storing and querying for embeddings with Redis

In a previous post, we wrote about using vectorized search and cosine similarity to quickly query a database of blog posts and retrieve the most relevant content to a natural language query. This is achieved using OpenAI’s embeddings API, Pinecone (a vector database), and OpenAI ChatCompletions. For reference, here’s the rough architecture:

Vectorized search with Pinecone

The steps above do the following:

  1. A console app retrieves blog post URLs from an RSS feed and reads all the posts one by one
  2. For each post, create an embedding with OpenAI which results in a vector of 1536 dimensions to store in Pinecone
  3. After the embedding is created, store the embedding in a Pinecone index; we created the index from the Pinecone portal
  4. A web app asks the user for a query (e.g., “How do I create a chat bot?”) and creates an embedding for the query
  5. Perform a vectorized search, finding the closest post vectors to the query vector using cosine similarity and keep the one with the highest score
  6. Use the ChatCompletion API and submit the same query but add the highest scoring post as context to the user question. The post text is injected into the prompt

ℹ️ See Pinecone and OpenAI magic: A guide to finding your long lost blog posts with vectorized search and ChatGPT – baeke.info for more information.

We can replace Pinecone with Redis, a popular open-source, in-memory data store that can be used as a database, cache, and message broker. Redis is well-suited for this task as it can also store vector representations of our blog posts and has the capability to perform vector queries efficiently.

You can easily run Redis with Docker for local development. In addition, Redis is available in Azure, although you will need the Enterprise version. Only Azure Cache for Redis Enterprise supports the RediSearch functionality and that’s what we need here! Note that the Enterprise version is quite costly.

By leveraging Redis for vector storage and querying, we can harness its high performance, flexibility, and reliability in our solution while maintaining the core functionality of quickly querying and retrieving the most relevant blog post content using vectorized search and similarity queries.

ℹ️ The code below shows snippets. Full samples (yes, samples 😀) are on GitHub: check upload_vectors_redis.py to upload posts to a local Redis instance and search_vectors_redis.py to test the query functionality.

Run Redis with Docker

If you have Docker on your machine, use the following command:

docker run --name redis-stack-server -p 6380:6379 redis/redis-stack-server:latest

ℹ️ I already had another instance of Redis running on port 6379 so I mapped port 6380 on localhost to port 6379 of the redis-stack-server container.

If you want a GUI to explore your Redis instance, install RedisInsight. The screenshot below shows the blog posts after uploading them as Redis hashes.

RedisInsight in action

Let’s look at creating the hashes next!

Storing post data in Redis hashes

We will create several Redis hashes, one for each post. Hashes are records structured as collections of field-value pairs. Each hash we store, has the following fields:

  • url: url to the blog post
  • embedding: embedding of the blog post (a vector), created with the OpenAI embeddings API and the text-embedding-ada-002 model

We need the URL to retrieve the entire post after a closest match has been found. In Pinecone, the URL would be metadata to the vector. In Redis, it’s just a field in a hash, just like the vector itself.

In RedisInsight, a hash is shown as below:

Redis hash for post 0 with url and embedding fields

The embedding field in the hash has no special properties. The vector is simply stored as a series of bytes. To store the urls and embeddings of posts, we can use the following code:

import redis
import openai
import os
import requests
from bs4 import BeautifulSoup
import feedparser


# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# Redis connection details
redis_host = os.getenv('REDIS_HOST')
redis_port = os.getenv('REDIS_PORT')
redis_password = os.getenv('REDIS_PASSWORD')

# Connect to the Redis server
conn = redis.Redis(host=redis_host, port=redis_port, password=redis_password, encoding='utf-8', decode_responses=True)

# URL of the RSS feed to parse
url = 'https://blog.baeke.info/feed/'

# Parse the RSS feed with feedparser
feed = feedparser.parse(url)

p = conn.pipeline(transaction=False)
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Create embedding and save for entry ", i, " of ", entries)

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    # print the embedding (length = 1536)
    vector = embedding["data"][0]["embedding"]

    # convert to numpy array and bytes
    vector = np.array(vector).astype(np.float32).tobytes()

    # Create a new hash with url and embedding
    post_hash = {
        "url": entry.link,
        "embedding": vector
    }

    # create hash
    conn.hset(name=f"post:{i}", mapping=post_hash)

p.execute()

In the above code, note the following:

  • The OpenAI embeddings API returns a JSON document that contains the embedding for each post; the embedding is retrieved with vector = embedding["data"][0]["embedding"]
  • The resulting vector is converted to bytes with vector = np.array(vector).astype(np.float32).tobytes(); serializing the vector this way is required to store the vector in the Redis hash
  • the Redis hset command is used to store the field-value pairs (these pairs are in a Python dictionary called post_hash) with a key that is prefixed with post: followed by the document number. The prefix will be used later by the search index we will create

Now we have our post information in Redis hashes, we want to use RediSearch functionality to match an input query with one or more of our posts. RediSearch supports vector similarity semantic search. For such a search to work, we will need to create an index that knows there is a vector field. On such indexes, we can perform vector similarity searches.

Creating an index

To create an index with Python code, check the code below:

import redis
from redis.commands.search.field import VectorField, TextField
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

# Redis connection details
redis_host = os.getenv('REDIS_HOST')
redis_port = os.getenv('REDIS_PORT')
redis_password = os.getenv('REDIS_PASSWORD')

# Connect to the Redis server
conn = redis.Redis(host=redis_host, port=redis_port, password=redis_password, encoding='utf-8', decode_responses=True)


SCHEMA = [
    TextField("url"),
    VectorField("embedding", "HNSW", {"TYPE": "FLOAT32", "DIM": 1536, "DISTANCE_METRIC": "COSINE"}),
]

# Create the index
try:
    conn.ft("posts").create_index(fields=SCHEMA, definition=IndexDefinition(prefix=["post:"], index_type=IndexType.HASH))
except Exception as e:
    print("Index already exists")


When creating an index, you define the fields to index based on a schema. Above, we include both the text field (url) and the vector field (embedding). The VectorField class is used to construct the vector field and takes several parameters:

  • Name: the name of the field (“embedding” here but could be anything)
  • Algorithm: “FLAT” or “HNSW”; use “FLAT” when search quality is of high priority and search speed is less important; “HNSW” gives you faster querying; for more information see this article
  • Attributes: a Python dictionary that specifies the data type, the number of dimensions of the vector (1536 for text-embedding-ada-002) and the distance metric; here we use COSINE for cosine similarity, which is recommended by OpenAI with their embedding model

ℹ️ It’s important to get the dimensions right or your index will fail to build properly. It will not be immediately clear that it failed, unless you run FT.INFO <indexname> with redis-cli.

With the schema out of the way, we can now create the index with:

conn.ft("posts").create_index(fields=SCHEMA, definition=IndexDefinition(prefix=["post:"], index_type=IndexType.HASH))

The index we create is called posts. We index the fields defined in SCHEMA and only index hashes with a key prefix of post:. The hashes we created earlier, all have this prefix. With the index created and our existing hashes, the index should be populated with them. Ensure you can see that in RedisInsight:

posts index populated with hashes that were added earlier

Redis vector queries

With the hashes and the index created, we can now perform a similarity search. We will ask the user for a query string (use natural language) and then check the posts that are similar to the query string. The query string will need to be vectorized as well. We will return several post and rank them.

import numpy as np
from redis.commands.search.query import Query
import redis
import openai
import os

openai.api_key = os.getenv('OPENAI_API_KEY')

def search_vectors(query_vector, client, top_k=5):
    base_query = "*=>[KNN 5 @embedding $vector AS vector_score]"
    query = Query(base_query).return_fields("url", "vector_score").sort_by("vector_score").dialect(2)    

    try:
        results = client.ft("posts").search(query, query_params={"vector": query_vector})
    except Exception as e:
        print("Error calling Redis search: ", e)
        return None

    return results

# Redis connection details
redis_host = os.getenv('REDIS_HOST')
redis_port = os.getenv('REDIS_PORT')
redis_password = os.getenv('REDIS_PASSWORD')

# Connect to the Redis server
conn = redis.Redis(host=redis_host, port=redis_port, password=redis_password, encoding='utf-8', decode_responses=True)

if conn.ping():
    print("Connected to Redis")

# Enter a query
query = input("Enter your query: ")

# Vectorize the query using OpenAI's text-embedding-ada-002 model
print("Vectorizing query...")
embedding = openai.Embedding.create(input=query, model="text-embedding-ada-002")
query_vector = embedding["data"][0]["embedding"]

# Convert the vector to a numpy array
query_vector = np.array(query_vector).astype(np.float32).tobytes()

# Perform the similarity search
print("Searching for similar posts...")
results = search_vectors(query_vector, conn)

if results:
    print(f"Found {results.total} results:")
    for i, post in enumerate(results.docs):
        score = 1 - float(post.vector_score)
        print(f"\t{i}. {post.url} (Score: {round(score ,3) })")
else:
    print("No results found")

In the above code, the following happens:

  • Set OpenAI API key: needed to create the embedding for the query typed by the user
  • Connect to Redis based on the environment variables and check the connection with ping().
  • Ask the user for a query
  • Create the embedding from the query string and convert the array to bytes
  • Call the search_vectors function with the vectorized query string and Redis connection as parameters

The search_vectors function uses RediSearch capabilities to query over our hashes and calculate the 5 nearest neighbors to our query vector. Querying is explained in detail in the Redis documentation but it can be a bit dense. You start with the base query:

 base_query = "*=>[KNN 5 @embedding $vector AS vector_score]"

This is just a string with the query format that Redis expects to pass to the Query class in the next step. We are looking for the 5 nearest neighbors of $vector in the embedding fields of the hashes. You use @ to denote the embedding field and $ to denote the vector we will pass in later. That vector is our vectorized query string. With AS vector_score, we add the score to later rank the results from high to low.

The actual query is built with the Query class (one line):

query = Query(base_query).return_fields("url", "vector_score").sort_by("vector_score").dialect(2)    

We return the url and the vector_score and sort on this score. Dialect is just the version of the query language. Here we use dialect 2 as that matches the query syntax. Using an earlier dialect would not work here.

Of course, this still does not pass the query vector to the query. That only happens when we run the query in Redis with:

results = client.ft("posts").search(query, query_params={"vector": query_vector})

The above code performs a search query on the posts index. In the call to the search method, we pass the query we built earlier and a list of query parameters. We only have one parameter, the vector parameter ($vector in base_query) and the value for this parameter is the embedding created from the user query string.

When I query for bot, I get the following results:

Our 5 query results

The results are ranked with the closest match first. We could use that match to grab the post from the URL and send the query to OpenAI ChatCompletion API to answer the question more precisely. For better results, use a better query like “How do I build a chat bot in Python with OpenAI?”. To get an idea of how to do that, check my previous post.

Conclusion

In this post we discussed storing embeddings in Redis and querying embeddings with a similarity search. If you combine this with my previous post, you can use Redis instead of Pinecone as the vector database and query engine. This can be useful for Azure customers because Azure has Azure Cache for Redis Enterprise, a fully managed service that supports the functionality discussed in this post. In addition, it is useful for local development purposes because you can easily run Redis with Docker.

Pinecone and OpenAI magic: A guide to finding your long lost blog posts with vectorized search and ChatGPT

Searching through a large database of blog posts can be a daunting task, especially if there are thousands of articles. However, using vectorized search and cosine similarity, you can quickly query your blog posts and retrieve the most relevant content.

In this blog post, we’ll show you how to query a list of blog posts (from this blog) using a combination of vectorized search with cosine similarity and OpenAI ChatCompletions. We’ll be using OpenAI’s embeddings API to vectorize the blog post articles and Pinecone, a vector database, to store and query the vectors. We’ll also show you how to retrieve the contents of the article, create a prompt using the ChatCompletion API, and return the result to a web page.

ℹ️ Sample code is on GitHub: https://github.com/gbaeke/gpt-vectors

ℹ️ If you want an introduction to embeddings and cosine similarity, watch the video on YouTube by Part Time Larry.

Setting Up Pinecone

Before we can start querying our blog posts, we need to set up Pinecone. Pinecone is a vector database that makes it easy to store and query high-dimensional data. It’s perfect for our use case since we’ll be working with high-dimensional vectors.

ℹ️ Using a vector database is not strictly required. The GitHub repo contains app.py, which uses scikit-learn to create the vectors and perform a cosine similarity search. Many other approaches are possible. Pinecone just makes storing and querying the vectors super easy.

ℹ️ If you want more information about Pinecone and the concept of a vector database, watch this introduction video.

First, we’ll need to create an account with Pinecone and get the API key and environment name. In the Pinecone UI, you will find these as shown below. There will be a Show Key and Copy Key button in the Actions section next to the key.

Key and environment for Pinecone

Once we have an API key and the environment, we can use the Pinecone Python library to create and use indexes. Install the Pinecone library with pip install pinecone-client.

Although you can create a Pinecone index from code, we will create the index in the Pinecone portal. Go to Indexes and select Create Index. Create the index using cosine as metric and 1536 dimensions:

blog-index in Pinecone

The embedding model we will use to create the vectors, text-embedding-ada-002, outputs vectors with 1536 dimensions. For more info see OpenAI’s blog post of December 15, 2022.

To use the Pinecode index from code, look at the snippet below:

import pinecone

pinecone_api = "<your_api_key>"
pinecone_env = "<your_environment>"

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

index = pinecone.Index('blog-index')

We create an instance of the Index class with the name “blog-index” and store this in index. This index will be used to store our blog post vectors or to perform searches on.

Vectorizing Blog Posts with OpenAI’s Embeddings API

Next, we’ll need to vectorize our blog post articles. We’ll be using OpenAI’s embeddings API to do this. The embeddings API takes a piece of text and returns a high-dimensional vector representation of that text. Here’s an example of how to do that for one article or string:

import openai

openai.api_key = "<your_api_key>"

article = "Some text from a blog post"

vector = openai.Embedding.create(
    input=article,
    model="text-embedding-ada-002"
)["data"][0]["embedding"]

We create a vector representation of our blog post article by calling the Embedding class’s create method. We pass in the article text as input and the text-embedding-ada-002 model, which is a pre-trained language model that can generate high-quality embeddings.

Storing Vectors in Pinecone

Once we have the vector representations of our blog post articles, we can store them in Pinecone. Instead of storing vector per vector, we can use upsert to store a list of vectors. The code below uses the feed of this blog to grab the URLs for 50 posts, every post is vectorized and the vector is added to a Python list of tuples, as expected by the upsert method. The list is then added to Pinecone at once. The tuple that Pinecone expects is:

(id, vector, metadata dictionary)

e.g. (0, vector for post 1, {"url": url to post 1}

Here is the code that uploads the first 50 posts of baeke.info to Pinecone. You need to set the Pinecone key and environment and the OpenAI key as environment variables. The code uses feedparser to grab the blog feed, and BeatifulSoup to parse the retrieved HTML. The code serves as an example only. It is not very robust when it comes to error checking etc…

import feedparser
import os
import pinecone
import numpy as np
import openai
import requests
from bs4 import BeautifulSoup

# OpenAI API key
openai.api_key = os.getenv('OPENAI_API_KEY')

# get the Pinecone API key and environment
pinecone_api = os.getenv('PINECONE_API_KEY')
pinecone_env = os.getenv('PINECONE_ENVIRONMENT')

pinecone.init(api_key=pinecone_api, environment=pinecone_env)

# set index; must exist
index = pinecone.Index('blog-index')

# URL of the RSS feed to parse
url = 'https://blog.baeke.info/feed/'

# Parse the RSS feed with feedparser
feed = feedparser.parse(url)

# get number of entries in feed
entries = len(feed.entries)
print("Number of entries: ", entries)

post_texts = []
pinecone_vectors = []
for i, entry in enumerate(feed.entries[:50]):
    # report progress
    print("Processing entry ", i, " of ", entries)

    r = requests.get(entry.link)
    soup = BeautifulSoup(r.text, 'html.parser')
    article = soup.find('div', {'class': 'entry-content'}).text

    # vectorize with OpenAI text-emebdding-ada-002
    embedding = openai.Embedding.create(
        input=article,
        model="text-embedding-ada-002"
    )

    # print the embedding (length = 1536)
    vector = embedding["data"][0]["embedding"]

    # append tuple to pinecone_vectors list
    pinecone_vectors.append((str(i), vector, {"url": entry.link}))

# all vectors can be upserted to pinecode in one go
upsert_response = index.upsert(vectors=pinecone_vectors)

print("Vector upload complete.")

Querying Vectors with Pinecone

Now that we have stored our blog post vectors in Pinecone, we can start querying them. We’ll use cosine similarity to find the closest matching blog post. Here is some code that does just that:

query_vector = <vector representation of query>  # vector created with OpenAI as well

search_response = index.query(
    top_k=5,
    vector=query_vector,
    include_metadata=True
)

url = get_highest_score_url(search_response['matches'])

def get_highest_score_url(items):
    highest_score_item = max(items, key=lambda item: item["score"])

    if highest_score_item["score"] > 0.8:
        return highest_score_item["metadata"]['url']
    else:
        return ""

We create a vector representation of our query (you don’t see that here but it’s the same code used to vectorize the blog posts) and pass it to the query method of the Pinecone Index class. We set top_k=5 to retrieve the top 5 matching blog posts. We also set include_metadata=True to include the metadata associated with each vector in our response. That way, we also have the URL of the top 5 matching posts.

The query method returns a dictionary that contains a matches key. The matches value is a list of dictionaries, with each dictionary representing a matching blog post. The score key in each dictionary represents the cosine similarity score between the query vector and the blog post vector. We use the get_highest_score_url function to find the blog post with the highest cosine similarity score.

The function contains some code to only return the highest scoring URL if the score is > 0.8. It’s of course up to you to accept lower matching results. There is a potential for the vector query to deliver an article that’s not highly relevant which results in an irrelevant context for the OpenAI ChatCompletion API call we will do later.

Retrieving the Contents of the Blog Post

Once we have the URL of the closest matching blog post, we can retrieve the contents of the article using the Python requests library and the BeautifulSoup library.

import requests
from bs4 import BeautifulSoup

r = requests.get(url)
soup = BeautifulSoup(r.text, 'html.parser')

article = soup.find('div', {'class': 'entry-content'}).text

We send a GET request to the URL of the closest matching blog post and retrieve the HTML content. We use the BeautifulSoup library to parse the HTML and extract the contents of the <div> element with the class “entry-content”.

Creating a Prompt for the ChatCompletion API

Now that we have the contents of the blog post, we can create a prompt for the ChatCompletion API. The crucial part here is that our OpenAI query should include the blog post we just retrieved!

response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[
        { "role": "system", "content": "You are a polite assistant" },
        { "role": "user", "content": "Based on the article below, answer the following question: " + your_query +
            "\nAnswer as follows:" +
            "\nHere is the answer directly from the article:" +
            "\nHere is the answer from other sources:" +
             "\n---\n" + article }
           
    ],
    temperature=0,
    max_tokens=200
)

response_text=f"\n{response.choices[0]['message']['content']}"

We use the ChatCompletion API with the gpt-3.5-turbo model to ask our question. This is the same as using ChatGPT on the web with that model. At this point in time, the GPT-4 model was not available yet.

Instead of one prompt, we send a number of dictionaries in a messages list. The first item in the list sets the system message. The second item is the actual user question. We ask to answer the question based on the blog post we stored in the article variable and we provide some instructions on how to answer. We add the contents of the article to our query.

If the article is long, you run the risk of using too many tokens. If that happens, the ChatCompletion call will fail. You can use the tiktoken library to count the tokens and prevent the call to happen in the first place. Or you can catch the exception and tell the user. In the above code, there is no error handling. We only include the core code that’s required.

Returning the Result to a Web Page

If you are running the search code in an HTTP handler as the result of the user typing a query in a web page, you can return the result to the caller:

return jsonify({
    'url': url,
    'response': response_text
})

The full example, including an HTML page and Flask code can be found on GitHub.

The result could look like this:

Query results in the closest URL using vectorized search and ChatGPT answering the question based on the contents the URL points at

Conclusion

Using vectorized search and cosine similarity, we can quickly query a database of blog posts and retrieve the most relevant post. By combining OpenAI’s embeddings API, Pinecone, and the ChatCompletion API, we can create a powerful tool for searching and retrieving blog post content using natural language.

Note that there are some potential issues as well. The code we show is merely a starting point:

  • Limitations of cosine similarity: it does not take into account all properties of the vectors, which can lead to misleading results
  • Prompt engineering: the prompt we use works but there might be prompts that just work better. Experimentation with different prompts is crucial!
  • Embeddings: OpenAI embeddings are trained on a large corpus of text, which may not be representative of the domain-specific language in the posts
  • Performance might not be sufficient if the size of the database grows large. For my blog, that’s not really an issue. 😀

Step-by-Step Guide: How to Build Your Own Chatbot with the ChatGPT API

In this blog post, we will be discussing how to build your own chat bot using the ChatGPT API. It’s worth mentioning that we will be using the OpenAI APIs directly and not the Azure OpenAI APIs, and the code will be written in Python. A crucial aspect of creating a chat bot is maintaining context in the conversation, which we will achieve by storing and sending previous messages to the API at each request. If you are just starting with AI and chat bots, this post will guide you through the step-by-step process of building your own simple chat bot using the ChatGPT API.

Python setup

Ensure Python is installed. I am using version 3.10.8. For editing code, I am using Visual Studio code as the editor. For the text-based chat bot, you will need the following Python packages:

  • openai: make sure the version is 0.27.0 or higher; earlier versions do not support the ChatCompletion APIs
  • tiktoken: a library to count the number of tokens of your chat bot messages

Install the above packages with your package manager. For example: pip install openai.

All code can be found on GitHub.

Getting an account at OpenAI

We will write a text-based chat bot that asks for user input indefinitely. The first thing you need to do is sign up for API access at https://platform.openai.com/signup. Access is not free but for personal use, while writing and testing the chat bot, the price will be very low. Here is a screenshot from my account:

Oh no, $0.13 dollars

When you have your account, generate an API key from https://platform.openai.com/account/api-keys. Click the Create new secret key button and store the key somewhere.

Writing the bot

Now create a new Python file called app.py and add the following lines:

import os
import openai
import tiktoken

openai.api_key = os.getenv("OPENAI_KEY")

We already discussed the openai and tiktoken libraries. We will also use the builtin os library to read environment variables.

In the last line, we read the environment variable OPENAI_KEY. If you use Linux, in your shell, use the following command to store the OpenAI key in an environment variable: export OPENAI_KEY=your-OpenAI-key. We use this approach to avoid storing the API key in your code and accidentally uploading it to GitHub.

To implement the core chat functionality, we will use a Python class. I was following a Udemy course about ChatGPT and it used a similar approach, which I liked. By the way, I can highly recommend that course. Check it out here.

Let’s start with the class constructor:

class ChatBot:

    def __init__(self, message):
        self.messages = [
            { "role": "system", "content": message }
        ]

In the constructor, we define a messages list and set the first item in that list to a configurable dictionary: { "role": "system", "content": message }. In the ChatGPT API calls, the messages list provides context to the API because it contains all the previous messages. With this initial system message, we can instruct the API to behave in a certain way. For example, later in the code, you will find this code to create an instance of the ChatBot class:

bot = ChatBot("You are an assistant that always answers correctly. If not sure, say 'I don't know'.")

But you could also do:

bot = ChatBot("You are an assistant that always answers wrongly.Always contradict the user")

In practice, ChatGPT does not follow the system instruction to strongly. User messages are more important. So it could be that, after some back and forth, the answers will not follow the system instruction anymore.

Let’s continue with another method in the class, the chat method:

def chat(self):
        prompt = input("You: ")
        
        self.messages.append(
            { "role": "user", "content": prompt}
        )
        
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages = self.messages,
            temperature = 0.8
        )
        
        answer = response.choices[0]['message']['content']
        
        print(answer)
        
        self.messages.append(
           { "role": "assistant", "content": answer} 
        )

        tokens = self.num_tokens_from_messages(self.messages)
        print(f"Total tokens: {tokens}")

        if tokens > 4000:
            print("WARNING: Number of tokens exceeds 4000. Truncating messages.")
            self.messages = self.messages[2:]

The chat method is where the action happens. It does the following:

  • It prompts the user to enter some input.
  • The user’s input is stored in a dictionary as a message with a “user” role and appended to a list of messages called self.messages. If this is the first input, we now have two messages in the list, a system message and a user message.
  • It then creates a response using OpenAI’s gpt-3.5-turbo model, passing in the self.messages list and a temperature of 0.8 as parameters. We use the ChatCompletion API versus the Completion API that you use with other models such as text-davinci-003.
  • The generated response is stored in a variable named answer. The full response contains a lot of information. We are only interested in the first response (there is only one) and grab the content.
  • The answer is printed to the console.
  • The answer is also added to the self.messages list as a message with an “assistant” role. If this is the first input, we now have three messages in the list: a system message, the first user message (the input) and the assistant’s response.
  • The total number of tokens in the self.messages list is computed using a separate function called num_tokens_from_messages() and printed to the console.
  • If the number of tokens exceeds 4000, a warning message is printed and the self.messages list is truncated to remove the first two messages. We will talk about these tokens later.

It’s important to realize we are using the Chat completions here. You can find more information about Chat completions here.

If you did not quite get how the text response gets extracted, here is an example of a full response from the Chat completion API:

{
 'id': 'chatcmpl-6p9XYPYSTTRi0xEviKjjilqrWU2Ve',
 'object': 'chat.completion',
 'created': 1677649420,
 'model': 'gpt-3.5-turbo',
 'usage': {'prompt_tokens': 56, 'completion_tokens': 31, 'total_tokens': 87},
 'choices': [
   {
    'message': {
      'role': 'assistant',
      'content': 'The 2020 World Series was played in Arlington, Texas at the Globe Life Field, which was the new home stadium for the Texas Rangers.'},
    'finish_reason': 'stop',
    'index': 0
   }
  ]
}

The response is indeed in choices[0][‘message’][‘content’].

To make this rudimentary chat bot work, we will repeatedly call the chat method like so:

bot = ChatBot("You are an assistant that always answers correctly. If not sure, say 'I don't know'.")
    while True:
        bot.chat()

Every time you input a question, the API answers and both the question and answer is added to the messages list. Of course, that makes the messages list grow larger and larger, up to a point where it gets to large. The question is: “What is too large?”. Let’s answer that in the next section.

Counting tokens

A language model does not work with text as humans do. Instead, they use tokens. It’s not important how this exactly works but it is important to know that you get billed based on these tokens. You pay per token.

In addition, the model we use here (gpt-3.5-turbo) has a maximum limit of 4096 tokens. This might change in the future. With our code, we cannot keep adding messages to the messages list because, eventually, we will pass the limit and the API call will fail.

To have an idea about the tokens in our messages list, we have this function:

def num_tokens_from_messages(self, messages, model="gpt-3.5-turbo"):
        try:
            encoding = tiktoken.encoding_for_model(model)
        except KeyError:
            encoding = tiktoken.get_encoding("cl100k_base")
        if model == "gpt-3.5-turbo":  # note: future models may deviate from this
            num_tokens = 0
            for message in messages:
                num_tokens += 4  # every message follows <im_start>{role/name}\n{content}<im_end>\n
                for key, value in message.items():
                    num_tokens += len(encoding.encode(value))
                    if key == "name":  # if there's a name, the role is omitted
                        num_tokens += -1  # role is always required and always 1 token
            num_tokens += 2  # every reply is primed with <im_start>assistant
            return num_tokens
        else:
            raise NotImplementedError(f"""num_tokens_from_messages() is not presently implemented for model {model}.""")

The above function comes from the OpenAI cookbook on GitHub. In my code, the function is used to count tokens in the messages list and, if the number of tokens is above a certain limit, we remove the first two messages from the list. The code also prints the tokens so you now how many you will be sending to the API.

The function contains references to <im_start> and <im_end>. This is ChatML and is discussed here. Because you use the ChatCompletion API, you do not have to worry about this. You just use the messages list and the API will transform it all to ChatML. But when you count tokens, ChatML needs to be taken into account for the total token count.

Note that Microsoft examples for Azure OpenAI, do use ChatML in the prompt, in combination with the default Completion APIs. See Microsoft Learn for more information. You will quickly see that using the ChatCompletion API with the messages list is much simpler.

To see, and download, the full code, see GitHub.

Running the code

To run the code, just run app.py. On my system, I need to use python3 app.py. I set the system message to You are an assistant that always answers wrongly. Contradict the user. 😀

Here’s an example conversation:

Although, at the start, the responses follow the system message, the assistant starts to correct itself and answers correctly. As stated, user messages eventually carry more weight.

Summary

In this post, we discussed how to build a chat bot using the ChatGPT API and Python. We went through the setup process, created an OpenAI account, and wrote the chat bot code using the OpenAI API. The bot used the ChatCompletion API and maintained context in the conversation by storing and sending previous messages to the API at each request. We also discussed counting tokens and truncating the message list to avoid exceeding the maximum token limit for the model. The full code is available on GitHub, and we provided an example conversation between the bot and the user. The post aimed to guide both beginning developers and beginners in AI and chat bot development through the step-by-step process of building their chat bot using the ChatGPT API and keep it as simple as possible.

Hope you liked it!

Authenticate to Azure Resources with Azure Managed Identities

In this post, we will take a look at managed identities in general and system-assigned managed identity in particular. Managed identities can be used by your code to authenticate to Azure AD resources from Azure compute resources that support it, like virtual machines and containers.

But first, let’s look at the other option and why you should avoid it if you can: service principals.

Service Principals

If you have code that needs to authenticate to Azure AD-protected resources such as Azure Key Vault, you can always create a service principal. It’s the option that always works. It has some caveats that will be explained further in this post.

The easiest way to create a service principal is with the single Azure CLI command below:

az ad sp create-for-rbac

The command results in the following output:

{
  "appId": "APP_ID",
  "displayName": "azure-cli-2023-01-06-11-18-45",
  "password": "PASSWORD",
  "tenant": "TENANT_ID"
}

If the service principal needs access to, let’s say, Azure Key Vault, you could use the following command to grant that access:

APP_ID="appId from output above"
$SUBSCRIPTION_ID="your subscription id"
$RESOURCE_GROUP="your resource group"
$KEYVAULT_NAME="short name of your key vault"

az role assignment create --assignee $APP_ID \
  --role "Key Vault Secrets User" \
  --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.KeyVault/vaults/$KEYVAULT_NAME"

The next step is to configure your application to use the service principal and its secret to obtain an Azure AD token (or credential) that can be passed to Azure Key Vault to retrieve secrets or keys. That means you need to find a secure way to store the service principal secret with your application, which is something you want to avoid.

In a Python app, you can use the ClientSecretCredential class and pass your Azure tenant id, the service principal appId (or client Id) and the secret. You can then use the secret with a SecretClient like in the snippet below.

# Create a credential object
credential = ClientSecretCredential(tenant_id, client_id, client_secret)

# Create a SecretClient using the credential
client = SecretClient(vault_url=VAULT_URL, credential=credential)

Other languages and frameworks have similar libraries to reach the same result. For instance JavaScript and C#.

This is quite easy to do but again, where do you store the service principal’s secret securely?

The command az ad sp create-for-rbac also creates an App Registration (and Enterprise application) in Azure AD:

Azure AD App Registration

The secret (or password) for our service principal is partly displayed above. As you can see, it expires a year from now (blog post written on January 6th, 2023). You will need to update the secret and your application when that time comes, preferably before that. We all know what expiring secrets and certificates give us: an app that’s not working because we forgot to update the secret or certificate!

💡 Note that one year is the default. You can set the number of years with the --years parameter in az ad sp create-for-rbac.

💡 There will always be cases where managed identities are not supported such as connecting 3rd party systems to Azure. However, it should be clear that whenever managed identity is supported, use it to provide your app with the credentials it needs.

In what follows, we will explain managed identities in general, and system-assigned managed identity in particular. Another blog post will discuss user-assigned managed identity.

Managed Identities Explained

Azure Managed Identities allow you to authenticate to Azure resources without the need to store credentials or secrets in your code or configuration files.

There are two types of Managed Identities:

  • system-assigned
  • user-assigned

System-assigned Managed Identities are tied to a specific Azure resource, such as a virtual machine or Azure Container App. When you enable a system-assigned identity for a resource, Azure creates a corresponding identity in the Azure Active Directory (AD) for that resource, similar to what you have seen above. This identity can be used to authenticate to any service that supports Azure AD authentication. The lifecycle of a system-assigned identity is tied to the lifecycle of the Azure resource. When the resource is deleted, the corresponding identity is also deleted. Via a special token endpoint, your code can request an access token for the resource it wants to access.

User-assigned Managed Identities, on the other hand, are standalone identities that can be associated with one or more Azure resources. This allows you to use the same identity across multiple resources and manage the identity’s lifecycle independently from the resources it is associated with. In your code, you can request an access token via the same special token endpoint. You will have to specify the appId (client Id) of the user-managed identity when you request the token because multiple identities could be assigned to your Azure resource.

In summary, system-assigned Managed Identities are tied to a specific resource and are deleted when the resource is deleted, while user-assigned Managed Identities are standalone identities that can be associated with multiple resources and have a separate lifecycle.

System-assigned managed identity

Virtual machines support system and user-assigned managed identity and make it easy to demonstrate some of the internals.

Let’s create a Linux virtual machine and enable a system-assigned managed identity. You will need an Azure subscription and be logged on with the Azure CLI. I use a Linux virtual machine here to demonstrate how it works with bash. Remember that this also works on Windows VMs and many other Azure resources such as App Services, Container Apps, and more.

Run the code below. Adapt the variables for your environment.

RG="rg-mi"
LOCATION="westeurope"
PASSWORD="oE2@pl9hwmtM"

az group create --name $RG --location $LOCATION

az vm create \
  --name vm-demo \
  --resource-group $RG \
  --image UbuntuLTS \
  --size Standard_B1s \
  --admin-username azureuser \
  --admin-password $PASSWORD \
  --assign-identity


After the creation of the resource group and virtual machine, the portal shows the system assigned managed identity in the virtual machine’s Identity section:

System assigned managed identity

We can now run some code on the virtual machine to obtain an Azure AD token for this identity that allows access to a Key Vault. Key Vault is just an example here.

We will first need to create a Key Vault and a secret. After that we will grant the managed identity access to this Key Vault. Run these commands on your own machine, not the virtual machine you just created:

# generate a somewhat random name for the key vault
KVNAME=kvdemo$RANDOM

# create with vault access policy which grants creator full access
az keyvault create --name $KVNAME --resource-group $RG

# with full access, current user can create a secret
az keyvault secret set --vault-name $KVNAME --name mysecret --value "TOPSECRET"

# show the secret; should reveal TOPSECRET
az keyvault secret show --vault-name $KVNAME --name mysecret

# switch the Key Vault to AAD authentication
az keyvault update --name $KVNAME --enable-rbac-authorization

Now we can grant the system assigned managed identity access to Key Vault via Azure RBAC. Let’s look at the identity with the command below:

az vm identity show --resource-group $RG --name vm-demo

This returns the information below. Note that principalId was also visible in the portal as Object (principal) ID. Yes, not confusing at all… 🤷‍♂️

{
  "principalId": "YOUR_PRINCIPAL_ID",
  "tenantId": "YOUR_TENANT_ID",
  "type": "SystemAssigned",
  "userAssignedIdentities": null
}

Now assign the Key Vault Secrets User role to this identity:

PRI_ID="principal ID above"
SUB_ID="Azure subscription ID"

# below, scope is the Azure Id of the Key Vault 

az role assignment create --assignee $PRI_ID \
  --role "Key Vault Secrets User" \
  --scope "/subscriptions/$SUB_ID/resourceGroups/$RG/providers/Micr
osoft.KeyVault/vaults/$KVNAME"

If you check the Key Vault in the portal, in IAM, you should see:

System assigned identity of VM has Secrets User role

Now we can run some code on the VM to obtain an Azure AD token to read the secret from Key Vault. SSH into the virtual machine using its public IP address with ssh azureuser@IPADDRESS. Next, use the commands below:

# install jq on the vm for better formatting; you will be asked for your password
sudo snap install jq

curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fvault.azure.net' -H Metadata:true | jq

It might look weird but by sending the curl request to that special IP address on the VM, you actually request an access token to access Key Vault resources (in this case, it could also be another type of resource). There’s more to know about this special IP address and the other services it provides. Check Microsoft Learn for more information.

The result of the curl command is JSON below (nicely formatted with jq):

{
  "access_token": "ACCESS_TOKEN",
  "client_id": "CLIENT_ID",
  "expires_in": "86038",
  "expires_on": "1673095093",
  "ext_expires_in": "86399",
  "not_before": "1673008393",
  "resource": "https://vault.azure.net",
  "token_type": "Bearer"
}

Note that you did not need any secret to obtain the token. Great!

Now run the following code but first replace <YOUR VAULT NAME> with the short name of your Key Vault:

# build full URL to your Key Vault
VAULTURL="https://<YOUR VAULT NAME>.vault.azure.net"

ACCESS_TOKEN=$(curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fvault.azure.net' -H Metadata:true | jq -r .access_token)

curl -s "$VAULTURL/secrets/mysecret?api-version=2016-10-01" -H "Authorization: Bearer $ACCESS_TOKEN" | jq -r .value

First, we set the vault URL to the full URL including https://. Next, we retrieve the full JSON token response but use jq to only grab the access token. The -r option strips the " from the response. Next, we use the Azure Key Vault REST API to read the secret with the access token for authorization. The result should be TOPSECRET! 😀

Instead of this raw curl code, which is great for understanding how it works under the hood, you can use Microsoft’s identity libraries for many popular languages. For example in Python:

from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient

# Authenticate using a system-assigned managed identity
credential = DefaultAzureCredential()

# Create a SecretClient using the credential and the key vault URL
secret_client = SecretClient(vault_url="https://YOURKVNAME.vault.azure.net", credential=credential)

# Retrieve the secret
secret = secret_client.get_secret("mysecret")

# Print the value of the secret
print(secret.value)

If you are somewhat used to Python, you know you will need to install azure-identity and azure-keyvault-secrets with pip. The DefaultAzureCredential class used in the code automatically works with system managed identity in virtual machines but also other compute such as Azure Container Apps. The capabilities of this class are well explained in the docs: https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python. The identity libraries for other languages work similarly.

What about Azure Arc-enabled servers?

Azure Arc-enabled servers also have a managed identity. It is used to update the properties of the Azure Arc resource in the portal. You can grant this identity access to other Azure resources such as Key Vault and then grab the token in a similar way. Similar but not quite identical. The code with curl looks like this (from the docs):

ChallengeTokenPath=$(curl -s -D - -H Metadata:true "http://127.0.0.1:40342/metadata/identity/oauth2/token?api-version=2019-11-01&resource=https%3A%2F%2Fvault.azure.net" | grep Www-Authenticate | cut -d "=" -f 2 | tr -d "[:cntrl:]")

ChallengeToken=$(cat $ChallengeTokenPath)

if [ $? -ne 0 ]; then
    echo "Could not retrieve challenge token, double check that this command is run with root privileges."
else
    curl -s -H Metadata:true -H "Authorization: Basic $ChallengeToken" "http://127.0.0.1:40342/metadata/identity/oauth2/token?api-version=2019-11-01&resource=https%3A%2F%2Fvault.azure.net"
fi

On an Azure Arc-enabled machine that runs on-premises or in other clouds, the special IP address 169.254.169.254 is not available. Instead, the token request is sent to http://localhost:40342. The call is designed to fail and respond with a Www-Authenticate header that contains the path to a file on the machine (created dynamically). Only specific users and groups on the machine are allowed to read the contents of that file. This step was added for extra security so that not every process can read the contents of this file.

The second command retrieves the contents of the file and uses it for basic authentication purposes in the second curl request. It’s the second curl request that will return the access token.

Note that this works for both Linux and Windows Azure Arc-enabled systems. It is further explained here: https://learn.microsoft.com/en-us/azure/azure-arc/servers/managed-identity-authentication.

In contrast with managed identity on Azure compute, I am not aware of support for Azure Arc in the Microsoft identity libraries. To obtain a token with Python, check the following gist with some sample code: https://gist.github.com/gbaeke/343b14305e468aa433fe90441da0cabd.

The great thing about this is that managed identity can work on servers not in Azure as long if you enable Azure Arc on them! 🎉

Conclusion

In this post, we looked at what managed identities are and zoomed in on system-assigned managed identity. Azure Managed Identities are a secure and convenient way to authenticate to Azure resources without having to store credentials in code or configuration files. Whenever you can, use managed identity instead of service principals. And as you have seen, it even works with compute that’s not in Azure, such as Azure Arc-enabled servers.

Stay tuned for the next post about user-assigned managed identity.

AKS Workload Identity Revisited

A while ago, I blogged about Workload Identity. Since then, Microsoft simplified the configuration steps and enabled Managed Identity, in addition to app registrations.

But first, let’s take a step back. Why do you need something like workload identity in the first place? Take a look at the diagram below.

Workloads (deployed in a container or not) often need to access Azure AD protected resources. In the diagram, the workload in the container wants to read secrets from Azure Key Vault. The recommended option is to use managed identity and grant that identity the required role in Azure Key Vault. Now your code just needs to obtain credentials for that managed identity.

In Kubernetes, that last part presents a challenge. There needs to be a mechanism to map such a managed identity to a pod and allow code in the container to obtain an Azure AD authentication token. The Azure AD Pod Identity project was a way to solve this but as of 24/10/2022, AAD Pod Identity is deprecated. It is now replaced by Workload Identity. It integrates with native Kubernetes capabilities to federate with external identity providers such as Azure AD. It has the following advantages:

  • Not an AKS feature, it’s a Kubernetes feature (other cloud, on-premises, edge); similar functionality exists for GKE for instance
  • Scales better than AAD Pod Identity
  • No need for custom resource definitions
  • No need to run pods that intercept IMDS (instance metadata service) traffic; instead, there are webhook pods that run when pods are created/updated

If the above does not make much sense, check https://learn.microsoft.com/en-us/azure/aks/use-azure-ad-pod-identity. But don’t use it OK? 😉

At a basic level, Workload Identity works as follows:

  • Your AKS cluster is configured to issue tokens. Via an OIDC (OpenID Connect) discovery document, published by AKS, Azure AD can validate the tokens it receives from the cluster.
  • A Kubernetes service account is created and properly annotated and labeled. Pods are configured to use the service account via the serviceAccount field.
  • The Azure Managed Identity is configured with Federated credentials. The federated credential contains a link to the OIDC discovery document (Cluster Issuer URL) and configures the namespace and service account used by the Kubernetes pod. That generates a subject identifier like system:serviceaccount:namespace_name:service_account_name.
  • Tokens can now be generated for the configured service account and swapped for an Azure AD token that can be picked up by your workload.
  • A Kubernetes mutating webhook is the glue that makes all of this work. It ensures the token is mapped to a file in your container and sets needed environment variables.

Creating a cluster with OIDC and Workload Identity

Create a basic cluster with one worker node and both features enabled. You need an Azure subscription and the Azure CLI. Ensure the prerequisites are met and that you are logged in with az login. Run the following in a Linux shell:

RG=your_resource_group
CLUSTER=your_cluster_name

az aks create -g $RG -n $CLUSTER --node-count 1 --enable-oidc-issuer \
  --enable-workload-identity --generate-ssh-keys

After deployment, find the OIDC Issuer URL with:

export AKS_OIDC_ISSUER="$(az aks show -n $CLUSTER -g $RG --query "oidcIssuerProfile.issuerUrl" -otsv)"

When you add /.well-known/openid-configuration to that URL, you will see something like:

OIDC discovery document

The field jwks_uri contains a link to key information, used by AAD to verify the tokens issued by Kubernetes.

In earlier versions of Workload Identity, you had to install a mutating admission webhook to project the Kubernetes token to a volume in your workload. In addition, the webhook also injected several environment variables:

  • AZURE_CLIENT_ID: client ID of an AAD application or user-assigned managed identity
  • AZURE_TENANT_ID: tenant ID of Azure subscription
  • AZURE_FEDERATED_TOKEN_FILE: the path to the federated token file; you can do cat $AZURE_FEDERATED_TOKEN_FILE to see the token. Note that this is the token issued by Kubernetes, not the exchanged AAD token (exchanging the token happens in your code). The token is a jwt. You can use https://jwt.io to examine it:
Decoded jwt issued by Kubernetes

But I am digressing… In the current implementation, you do not have to install the mutating webhook yourself. When you enable workload identity with the CLI, the webhook is installed automatically. In kube-system, you will find pods starting with azure-wi-webhook-controller-manager. The webhook kicks in whenever you create or update a pod. The end result is the same. You get the projected token + the environment variables.

Creating a service account

Ok, now we have a cluster with OIDC and workload identity enabled. We know how to retrieve the issuer URL and we learned we do not have to install anything else to make this work.

You will have to configure the pods you want a token for. Not every pod has containers that need to authenticate to Azure AD. To configure your pods, you first create a Kubernetes service account. This is a standard service account. To learn about service accounts, check my YouTube video.

apiVersion: v1
kind: ServiceAccount
metadata:
  annotations:
    azure.workload.identity/client-id: CLIENT ID OF MANAGED IDENTITY
  labels:
    azure.workload.identity/use: "true"
  name: sademo
  namespace: default

The label ensures that the mutating webhook will do its thing when a pod uses this service account. We also indicate the managed identity we want a token for by specifying its client ID in the annotation.

Note: you need to create the managed identity yourself and grab its client id. Use the following commands:

RG=your_resource_group
IDENTITY=your_chosen_identity_name
LOCATION=your_azure_location (e.g. westeurope)

export SUBSCRIPTION_ID="$(az account show --query "id" -otsv)"

az identity create --name $IDENTITY --resource-group $RG \
  --location $LOCATION --subscription $SUBSCRIPTION_ID

export USER_ASSIGNED_CLIENT_ID="$(az identity show -n $IDENTITY -g $RG --query "clientId" -otsv)"

echo $USER_ASSIGNED_CLIENT_ID

The last command prints the id to use in the service account azure.workload.identity/client-id annotation.

Creating a pod that uses the service account

Let’s create a deployment that deploys pods with an Azure CLI image:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: azcli-deployment
  namespace: default
  labels:
    app: azcli
spec:
  replicas: 1
  selector:
    matchLabels:
      app: azcli
  template:
    metadata:
      labels:
        app: azcli
    spec:
      # needs to refer to service account used with federation
      serviceAccount: sademo
      containers:
        - name: azcli
          image: mcr.microsoft.com/azure-cli:latest
          command:
            - "/bin/bash"
            - "-c"
            - "sleep infinity"

Above, the important line is serviceAccount: sademo. When the pod is created or modified, the mutating webhook will check the service account and its annotations. If it is configured for workload identity, the webhook will do its thing: projecting the Kubernetes token file and setting the environment variables:

The webhook did its work 😉

How to verify it works?

We can use the Azure CLI support for federated tokens as follows:

az login --federated-token "$(cat $AZURE_FEDERATED_TOKEN_FILE)" \
--service-principal -u $AZURE_CLIENT_ID -t $AZURE_TENANT_ID

After running the command, the error below appears:

Oh no…

Clearly, something is wrong and there is. We have forgotten to configure the managed identity for federation. In other words, when we present our Kubernetes token, Azure AD needs information to validate it and return an AAD token.

Use the following command to create a federated credential on the user-assigned managed identity you created earlier:

RG=your_resource_group
IDENTITY=your_chosen_identity_name
AKD_OIDC_ISSUER=your_oidc_issuer
SANAME=sademo

az identity federated-credential create --name fic-sademo \
  --identity-name $IDENTITY \
  --resource-group $RG --issuer ${AKS_OIDC_ISSUER} \
  --subject system:serviceaccount:default:$SANAME

After running the above command, the Azure Managed Identity has the following configuration:

Federated credentials on the Managed Identity

More than one credential is possible. Click on the name of the federated credential. You will see:

Details of the federated credential

Above, the OIDC Issuer URL is set to point to our cluster. We expect a token with a subject identifier (sub) of system:serviceaccount:default:sademo. You can check the decoded jwt earlier in this post to see that the sub field in the token issued by Kubernetes matches the one above. It needs to match or the process will fail.

Now we can run the command again:

az login --federated-token "$(cat $AZURE_FEDERATED_TOKEN_FILE)" \
--service-principal -u $AZURE_CLIENT_ID -t $AZURE_TENANT_ID

You will be logged in to the Azure CLI with the managed identity credentials:

But what about your own apps?

Above, we used the Azure CLI. The most recent versions (>= 2.30.0) support federated credentials and use MSAL. But what about your custom code?

The code below is written in Python and uses the Python Azure identity client library with DefaultAzureCredential. This code works with managed identity in Azure Container Apps or Azure App Service and was not modified. Here’s the code for reference:

import threading
import os
import logging
import time
import signal
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential

from azure.appconfiguration.provider import (
    AzureAppConfigurationProvider,
    SettingSelector,
    AzureAppConfigurationKeyVaultOptions
)

logging.basicConfig(encoding='utf-8', level=logging.WARNING)

def get_config(endpoint):
  selects = {SettingSelector(key_filter=f"myapp:*", label_filter="prd")}
  trimmed_key_prefixes = {f"myapp:"}
  key_vault_options = AzureAppConfigurationKeyVaultOptions(secret_resolver=retrieve_secret)
  app_config = {}
  try:
    app_config = AzureAppConfigurationProvider.load(
            endpoint=endpoint, credential=CREDENTIAL, selects=selects, key_vault_options=key_vault_options, 
            trimmed_key_prefixes=trimmed_key_prefixes)
  except Exception as ex:
    logging.error(f"error loading app config: {ex}")

  return app_config

def run():
    try:
      global CREDENTIAL 
      CREDENTIAL = DefaultAzureCredential(exclude_visual_studio_code_credential=True)
    except Exception as ex:
      logging.error(f"error setting credentials: {ex}")

    endpoint = os.getenv('AZURE_APPCONFIGURATION_ENDPOINT')

    if not endpoint:
        logging.error("Environment variable 'AZURE_APPCONFIGURATION_ENDPOINT' not set")

    app_config =  {}
    while True:
        if not app_config:
            logging.warning("trying to load app config")
            app_config = get_config(endpoint)
        else:
            config_value=app_config['appkey']
            logging.warning(f"doing useful work with {config_value}")
            # if key exists in app_config, do something with it
            if 'mysecret' in app_config:
                logging.warning(f"and hush hush, there's a secret: {app_config['mysecret']}")
        time.sleep(5)


class GracefulKiller:
  kill_now = False
  def __init__(self):
    signal.signal(signal.SIGINT, self.exit_gracefully)
    signal.signal(signal.SIGTERM, self.exit_gracefully)

  def exit_gracefully(self, *args):
    self.kill_now = True


def retrieve_secret(uri):
    try:
        # uri is in format: https://<keyvaultname>.vault.azure.net/secrets/<secretname>
        # retrieve key vault uri and secret name from uri
        vault_uri = "https://" + uri.split('/')[2]
        secret_name = uri.split('/')[-1]
        logging.warning(f"Retrieving secret {secret_name} from {vault_uri}...")

        # retrieve the secret from Key Vault; CREDENTIAL was set globally
        secret_client = SecretClient(vault_url=vault_uri, credential=CREDENTIAL)

        # get secret value from Key Vault
        secret_value = secret_client.get_secret(secret_name).value

    except Exception as ex:
        print(f"retrieving secret: {ex}")
    
    return secret_value

# main function
def main():
    # create a Daemon tread
    t = threading.Thread(daemon=True, target=run, name="worker")
    t.start()
    

    killer = GracefulKiller()
    while not killer.kill_now:
        time.sleep(1)

    logging.info("Doing some important cleanup before exiting")
    logging.info("Gracefully exiting")


if __name__ == "__main__":
    main()

On Docker Hub, the gbaeke/worker:1.0.0 image runs this code. The following manifest runs the code on Kubernetes with the same managed identity as the Azure CLI example (same service account):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: worker
  namespace: default
  labels:
    app: worker
spec:
  replicas: 1
  selector:
    matchLabels:
      app: worker
  template:
    metadata:
      labels:
        app: worker
    spec:
      # needs to refer to service account used with federation
      serviceAccount: sademo
      containers:
        - name: worker
          image: gbaeke/worker:1.0.0
          env:
            - name: AZURE_APPCONFIGURATION_ENDPOINT
              value: https://ac-appconfig-vr6774lz3bh4i.azconfig.io

Note that the code tries to connect to Azure App Configuration. The managed identity has been given the App Configuration Data Reader role on a specific instance. The code tries to read the value of key myapp:appkey with label prd from that instance:

App Config key and values

To make the code work, the environment variable AZURE_APPCONFIGURATION_ENDPOINT is set to the URL of the App Config instance.

In the container logs, we can see that the value was successfully retrieved:

Log stream of worker

And yes, the code just works! It successfully connected to App Config and retrieved the value. The environment variables, set by the webhook discussed earlier, make this work, together with the Python Azure identity library!

Conclusion

Workload Identity works like a charm and is relatively easy to configure. At the time of writing (end of November 2022), I guess we are pretty close to general availability and we finally will have a fully supported managed identity solution for AKS and beyond!

A quick look at Azure App Configuration and the Python Provider

When developing an application, it is highly likely that it needs to be configured with all sorts of settings. A simple list of key/value pairs is usually all you need. Some of the values can be read by anyone (e.g., a public URL) while some values should be treated as secrets (e.g., a connection string).

Azure App Configuration is a service to centrally manage these settings in addition to feature flags. In this post, we will look at storing and retrieving application settings and keeping feature flags for another time. I will also say App Config instead of App Configuration to save some keystrokes. 😉

We will do the following:

  • Retrieve key-value pairs for multiple applications and environments from one App Config instance
  • Use Key Vault references in App Config and retrieve these from Key Vault directly
  • Use the Python provider client to retrieve key-value pairs and store them in a Python dictionary

Why use App Configuration at all?

App Configuration helps by providing a fully managed service to store configuration settings for your applications separately from your code. Storing configuration separate from code is a best practice that most developers should follow.

Although you could store configuration values in files, using a service like App Config provides some standardization within or across developer teams.

Some developers store both configuration values and secrets in Key Vault. Although that works, App Config is way more flexible in organizing the settings and retrieving lists of settings with key and label filters. If you need to work with more than a few settings, I would recommend using a combination of App Config and Key Vault.

In what follows, I will show how we store settings for multiple applications and environments in the same App Config instance. Some of these settings will be Key Vault references.

Read https://learn.microsoft.com/en-us/azure/azure-app-configuration/overview before continuing to know more about App Config.

Provisioning App Config

Provisioning App Configuration is very easy from the portal or the Azure CLI. With the Azure CLI, use the following commands to create a resource group and an App Configuration instance in that group:

az group create -n RESOURCEGROUP -l LOCATION
az appconfig create -g RESOURCEGROUP  -n APPCONFIGNAME -l LOCATION

After deployment, we can check the portal and navigate to Configuration Explorer.

App Configuration in the Azure Portal

In Configuration Explorer, you can add the configuration values for your apps. They are just key/value pairs but they can be further enriched with labels, content types, and tags.

Note that there is a Free and a Standard tier of App Config. See https://azure.microsoft.com/en-us/pricing/details/app-configuration/ for more information. In production, you should use the Standard tier.

Storing configuration and secrets for multiple apps and environments

To store configuration values for multiple applications, you will have to identify the application in the key. App Configuration, oddly, has no knowledge of applications. For example, a key could be app1:setting1. You decide on the separator between the app name (app1 here) and its setting (setting1). In your code, you can easily query all settings for your app with a key filter (e.g. “app1:”. I will show an example of using a key filter later with the Python provider.

If you want to have different values for a key per environment (like dev, prd, etc…), you can add a label for each environment. To retrieve all settings for an environment, you can use a label filter. I will show an example of using a label filter later.

Suppose you want to use app1:setting1 in two environments: dev and prd. How do you create the key-value pairs? One way is to use the Azure CLI. You can also create them with the portal or from Python, C#, etc… With the CLI:

az appconfig kv set --name APPCONFIGNAME  --key app1:setting1 --value "value1" --label dev

APPCONFIG name is the name of your App Config instance. Just the name, not the full URL. For the prd environment:

az appconfig kv set --name APPCONFIGNAME  --key app1:setting1 --value "value2" --label prd

In Configuration Explorer, you will now see:

app1:setting1 for two environments (via labels)

For more examples of using the Azure CLI, see https://learn.microsoft.com/en-us/azure/azure-app-configuration/scripts/cli-work-with-keys.

In addition to these plain key-value pairs, you can also create Key Vault references. Let’s create one from the portal. In Configuration Explorer, click + Create and select Key Vault reference. You will get the following UI that allows you to create the reference. Make sure you have a Key Vault with a secret called dev-mysecret if you want to follow along. Below, set the label to dev. I forgot that in the screenshot below:

Creating a Key Vault Reference

Above, I am using the same naming convention for the key in App Config: app1:mysecret. Notice though that the secret I am referencing in Key Vault contains the environment and a dash (-) before the actual secret name. If you use one Key Vault per app instead of a Key Vault per app and environment, you will have to identify the environment in the secret name in some way.

After creating the reference, you will see the following in Configuration explorer:

Configuration explorer with one Key Vault reference

Note that the Key Vault reference has a content type. The content type is application/vnd.microsoft.appconfig.keyvaultref+json;charset=utf-8. You can use the content type in your code to know if the key contains a reference to a Key Vault secret. That reference will be something like https://kv-app1-geba.vault.azure.net/secrets/dev-mysecret. You can then use the Python SDK for Azure Key Vault to retrieve the secret from your code. Azure App Config will not do that for you.

You can use content types in other ways as well. For example, you could store a link to a storage account blob and use a content type that informs your code it needs to retrieve the blob from the account. Of course, you will need to write code to retrieve the blob. App Config only contains the reference.

Reading settings

There are many ways to read settings from App Config. If you need them in an Azure Pipeline, for instance, you can use the Azure App Configuration task to pull keys and values from App Config and set them as Azure pipeline variables.

If you deploy your app to Kubernetes and you do not want to read the settings from your code, you can integrate App Configuration with Helm. See https://learn.microsoft.com/en-us/azure/azure-app-configuration/integrate-kubernetes-deployment-helm for more information.

In most cases though, you will want to read the settings directly from your code. There is an SDK for several languages, including Python. The SDK has all the functionality you need to read and write settings.

Next to the Python SDK, there is also a Python provider which is optimized to read settings from App Config and store them in a Python dictionary. The provider has several options to automatically trim app names from keys and to automatically retrieve a secret from Key Vault if the setting in App Config is a Key Vault reference.

To authenticate to App Config, the default is access keys with a connection string. You can find the connection string in the Portal:

App Config Connection string for read/write or just read

You can also use Azure AD (it’s always enabled) and disable access keys. In this example, I will use a connection string to start with:

Before we connect and retrieve the values, ensure you install the provider first:

pip install azure-appconfiguration-provider

Above, use pip or pip3 depending on your installation of Python.

In your code, ensure the proper imports:

from azure.appconfiguration.provider import (
    AzureAppConfigurationProvider,
    SettingSelector,
    AzureAppConfigurationKeyVaultOptions
)
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential

To authenticate to Azure Key Vault with Azure AD, we can use DefaultAzureCredential():

try:
    CREDENTIAL = DefaultAzureCredential(exclude_visual_studio_code_credential=True)
except Exception as ex:
    print(f"error setting credentials: {ex}")

Note: on my machine, I had an issue with the VS Code credential feature so I turned that off.

Next, use a SettingSelector from the provider to provide a key filter and label filter. I want to retrieve key-value pairs for an app called app1 and an environment called dev:

app = 'app1'
env = 'dev'
selects = {SettingSelector(key_filter=f"{app}:*", label_filter=env)}

Next, when I retrieve the key-value pairs, I want to strip app1: from the keys:

trimmed_key_prefixes = {f"{app}:"}

In addition, I want the provider to automatically go to Key Vault and retrieve the secret:

key_vault_options = AzureAppConfigurationKeyVaultOptions(secret_resolver=retrieve_secret)

retrieve_secret refers to a function you need to write to retrieve the secret and add custom logic. There are other options as well.

def retrieve_secret(uri):
    try:
        # uri is in format: https://<keyvaultname>.vault.azure.net/secrets/<secretname>
        # retrieve key vault uri and secret name from uri
        vault_uri = "https://" + uri.split('/')[2]
        secret_name = uri.split('/')[-1]
        print(f"Retrieving secret {secret_name} from {vault_uri}...")
 
        # retrieve the secret from Key Vault; CREDENTIAL was set globally
        secret_client = SecretClient(vault_url=vault_uri, credential=CREDENTIAL)
 
        # get secret value from Key Vault
        secret_value = secret_client.get_secret(secret_name).value
 
    except Exception as ex:
        print(f"retrieving secret: {ex}", 1)

    return secret_value

Now that we have all the options, we can retrieve the key-value pairs.

connection_string = 'YOURCONNSTR'
app_config = AzureAppConfigurationProvider.load(
    connection_string=connection_string, selects=selects, key_vault_options=key_vault_options, 
    trimmed_key_prefixes=trimmed_key_prefixes)

print(app_config)

Now we have a Python dictionary app_config with all key-value pairs for app1 and environment dev. The key-value pairs are a mix of plain values from App Config and Key Vault.

You can now use this dictionary in your app in whatever way you like.

If you would like to use the same CREDENTIAL to connect to App Config, you can also use:

endpoint = 'APPCONFIGNAME.azconfig.io' # no https://
app_config = AzureAppConfigurationProvider.load(
    endpoint=endpoint, credential=CREDENTIAL, selects=selects, key_vault_options=key_vault_options, 
    trimmed_key_prefixes=trimmed_key_prefixes)

Ensure the credential you use has the App Configuration Data Reader role to read the key-value pairs.

Here’s all the code in a gist: https://gist.github.com/gbaeke/9b075a87a1198cdcbcc2b2028492085b. Ensure you have the key-value pairs as above and provide the connection string to the connection_string variable.

Conclusion

In this post, we showed how to retrieve key-value pairs with the Python provider from one App Config instance for multiple applications and environments.

The application is stored as a prefix in the key (app1:). The environment is a label (e.g., dev), allowing us to have the same setting with different values per environment.

Some keys can contain a reference to Key Vault to allow your application to retrieve secrets from Key Vault as well. I like this approach to have a list of all settings for an app and environment, where the value of the key can be an actual value or a reference to some other entity like a secret, a blob, or anything else.

First steps with Crossplane

Image Source: crossplane.io

Although Crossplane has been around for a while, I never got around to trying it. Crossplane has many capabilities. However, in this post, I will focus on the following aspects:

  • Installing Crossplane on a Kubernetes cluster (AKS); you can install on a local cluster as well (e.g., k3s, kind, minikube, …) but then you would need Azure Arc for Kubernetes to install the microsoft.flux extension (I will be using GitOps with Flux via that extension)
  • Adding and configuring providers for Azure and Kubernetes: providers allow you to deploy to Azure and Kubernetes (and much more) from Crossplane
  • Deploying Azure infrastructure with Crossplane using a fully declarative GitOps approach

Introduction

Crossplane basically allows you to build a control plane that you or your teams can use to deploy infrastructure and applications. This control plane is built on Kubernetes. In short, suppose I want to deploy an Azure resource group with Crossplane, I would create the below YAML file and apply it with kubectl apply -f filename.yaml.

This is, in essence, a fully declarative approach to deploying Azure infrastructure using Kubernetes. There are other projects, such as the Azure Service Operator v2, that do something similar.

apiVersion: azure.jet.crossplane.io/v1alpha2
kind: ResourceGroup
metadata:
  name: rg-crossplane
spec:
  forProvider:
    location: "westeurope"
    tags:
      provisioner: crossplane
  providerConfigRef:
    name: default

In order to enable this functionality, you need the following:

  • Install Crossplane on your Kubernetes cluster
  • Add a provider that can create Azure resources; above the jet provider for Azure is used; more about providers later
  • Configure the provider with credentials; in this case Azure credentials

In a diagram:

Install Crossplane from git with Flux on AKS; deploy an Azure resource group and another AKS cluster from Crossplane; create a namespace on that new cluster

Combination with GitOps

Although you can install and configure Crossplane manually and just use kubectl to add custom resources, I wanted to add Crossplane and custom resources using GitOps. To that end, I am using Azure Kubernetes Service (AKS) with the microsoft.flux extension. For more information to enable and install the extension, see my Flux v2 quick guide.

⚠️ The git repository I am using with Flux v2 and Crossplane is here: https://github.com/gbaeke/crossplane/tree/blogpost. This refers to the blogpost branch, which should match the content of this post. Tbe main branch might be different.

The repo contains several folders that match Flux kustomizations:

  • infra folder: installs Crossplane and Azure Key Vault to Kubernetes; an infra kustomization will point to this folder
  • secrets folder: creates a secret with Azure Key Vault to Kubernetes from Azure Key Vault; the secrets kustomization will point to this folder
  • crossplane-apps folder: installs Azure resources and Kubernetes resources with the respective Crossplane providers; the apps kustomization will point to this folder

Note: if you do not know what Flux kustomizations are and how Flux works, do check my Flux playlist: https://www.youtube.com/playlist?list=PLG9qZAczREKmCq6on_LG8D0uiHMx1h3yn. The videos look at the open source version of Flux and not the microsoft.flux extension. To learn more about that extension, see https://www.youtube.com/watch?v=w_eoJbgDs3g.

Installing Crossplane

The infra customization installs Crossplane and Azure Key Vault to Kubernetes. The latter is used to sync a secret from Key Vault that contains credentials for the Crossplane Azure provider. More details are in the diagram below:

As noted above, the installation of Crossplane is done with Flux. First, there is the HelmRepository resource that adds the Crossplane Helm repository to Flux.

apiVersion: source.toolkit.fluxcd.io/v1beta2
kind: HelmRepository
metadata:
  namespace: config-infra
  name: crossplane
spec:
  interval: 1m0s
  url: https://charts.crossplane.io/stable

Next, there is the HelmRelease that installs Crossplane. Important: target namespace is crossplane-system (bottom line):

apiVersion: helm.toolkit.fluxcd.io/v2beta1
kind: HelmRelease
metadata:
  name: crossplane
  namespace: config-infra
spec:
  chart:
    spec:
      chart: crossplane
      reconcileStrategy: ChartVersion
      sourceRef:
        kind: HelmRepository
        name: crossplane
        namespace: config-infra
  install:
    createNamespace: true
  interval: 1m0s
  targetNamespace: crossplane-system

For best results, in the YAML above, set the namespace of the resource to the namespace you use with the AKS k8s-configuration. The resources to install Azure Key Vault to Kubernetes are similar.

To install the Crossplane jet provider for Azure:

---
apiVersion: pkg.crossplane.io/v1alpha1
kind: ControllerConfig
metadata:
  name: jet-azure-config
  labels:
    app: crossplane-provider-jet-azure
spec:
  image: crossplane/provider-jet-azure-controller:v0.9.0
  args: ["-d"]
---
apiVersion: pkg.crossplane.io/v1
kind: Provider
metadata:
  name: crossplane-provider-jet-azure
spec:
  package: crossplane/provider-jet-azure:v0.9.0
  controllerConfigRef:
    name: jet-azure-config

Above, debugging is turned on for the provider. This is optional. The provider actually runs in the crossplane-system namespace:

jet provider

The provider is added via the Provider resource (second resource in the YAML manifest).

We can now create the AKS k8s-configuration, which creates a Flux source and a kustomization:

RG=your AKS resource group
CLUSTER=your AKS cluster name (to install Crossplane to)

az k8s-configuration flux create -g $RG -c $CLUSTER \
  -n cluster-config --namespace config-infra -t managedClusters \
  --scope cluster -u https://github.com/gbaeke/crossplane \
  --branch main  \
  --kustomization name=infra path=./infra prune=true

The Flux source will be the repo specified with -u. There is one kustomization: infra. Pruning is turned on. With pruning, removing manifests from the repo results is removing them from Kubernetes.

The k8s-configuration should result in:

Don’t mind the other Kustomizations; will be added later; this is the GitOps view in the properties of the cluster in the Azure Portal

Crossplane is now installed with two providers. We can now configure the Azure provider with credentials.

Configuring Azure Credentials

You need to create a service principal by following the steps in https://crossplane.io/docs/v1.9/cloud-providers/azure/azure-provider.html. I compacted the resulting JSON with:

cat <path-to-JSON> | jq -c

The output of the above command was added to Key Vault:

Azure creds in Key Vault

The Key Vault I am using uses the Azure RBAC permission model. Ensure that the AKS cluster’s kubelet identity has at least the Key Vault Secrets User role. It is a user-assigned managed identity with a name like clustername-agentpool.

To actually create a Kubernetes secret from this Key Vault secret, the secrets folder in the git repo contains the manifest below:

apiVersion: spv.no/v2beta1
kind: AzureKeyVaultSecret
metadata:
  name: azure-creds 
  namespace: crossplane-system
spec:
  vault:
    name: kvgebadefault # name of key vault
    object:
      name: azure-creds # name of the akv object
      type: secret # akv object type
  output: 
    secret: 
      name: azure-creds # kubernetes secret name
      dataKey: creds # key to store object value in kubernetes secret

This creates a Kubernetes secret in the crossplane-system namespace with name azure-creds and a key creds that holds the credentials JSON.

Secret as seen in k9s
the decoded secret as shown in k9s

To add the secret(s) as an extra kustomization, run:

RG=your AKS resource group
CLUSTER=your AKS cluster name

az k8s-configuration flux create -g $RG -c $CLUSTER \
  -n cluster-config --namespace config-infra -t managedClusters \
  --scope cluster -u https://github.com/gbaeke/crossplane \
  --branch main  \
  --kustomization name=infra path=./infra prune=true \
  --kustomization name=secrets path=./secrets prune=true dependsOn=["infra"]

Note that the secrets kustomization is dependent on the infra kustomization. After running this command, ensure the secret is in the crossplane-system namespace. The k8s-configuration uses the same source but now has two kustomizations.

Deploying resources with the Jet provider for Azure

Before explaining how to create Azure resources, a note on providers. As a novice Crossplane user, I started with the following Azure provider: https://github.com/crossplane-contrib/provider-azure. This works well but it is not so simple for contributors to ensure the provider is up-to-date with the latest and greatest Azure features. For example, if you deploy AKS, you cannot use managed identity, the cluster uses availability sets etc…

To improve this, Terrajet was created. It is a code generation framework that can generate Crossplane CRDs (custom resource definitions) and sets up the provider to use Terraform. Building on top of Terraform is an advantage because it is more up-to-date with new cloud features. That is the reason why this post uses the jet provider. When we later create an AKS cluster, it will take advantage of managed identity and other newer features.

Note: there is also a Terraform provider that can take Terraform HCL to do anything you want; we are not using that in this post

Ok, let’s create a resource group and deploy AKS. First, we have to configure the provider with Azure credentials. The crossplane-apps folder contains a file called jet-provider-config.yaml:

apiVersion: azure.jet.crossplane.io/v1alpha1
kind: ProviderConfig
metadata:
  name: default
spec:
  credentials:
    source: Secret
    secretRef:
      namespace: crossplane-system
      name: azure-creds
      key: creds

The above ProviderConfig tells the provider to use the credentials in the Kubernetes secret we created earlier. We know we are configuring the jet provider from the apiVersion: azure.jet.crossplane.io/v1alpha1.

With that out of the way, we can create the resource group and AKS cluster. Earlier in this post, the YAML to create the resource group was already shown. To create a basic AKS cluster called clu-cp in this group, aks.yaml is used:

apiVersion: containerservice.azure.jet.crossplane.io/v1alpha2
kind: KubernetesCluster
metadata:
  name: clu-cp
spec:
  writeConnectionSecretToRef:
    name: example-kubeconfig
    namespace: crossplane-system
  forProvider:
    location: "westeurope"
    resourceGroupNameRef:
      name: rg-crossplane
    dnsPrefix: "clu-cp"
    defaultNodePool:
      - name: default
        nodeCount: 1
        vmSize: "Standard_D2_v2"
    identity:
      - type: "SystemAssigned"
    tags:
      environment: dev
  providerConfigRef:
    name: default

Above, we refer to our resource group by name (resourceGroupNameRef) and we write the credentials to our cluster to a secret (writeConnectionSecretToRef). That secret will contain keys with the certificate and private key, but also a kubeconfig key with a valid kubeconfig file. We can use that later to connect and deploy to the cluster.

To see an example of connecting to the deployed cluster and creating a namespace, see k8s-provider-config.yaml and k8s-namespace.yaml in the repo. The resource k8s-provider-config.yaml will use the example-kubeconfig secret created above to connect to the AKS cluster that we created in the previous steps.

To create a kustomization for the crossplane-apps folder, run the following command:

RG=your AKS resource group
CLUSTER=your AKS cluster name

az k8s-configuration flux create -g $RG -c $CLUSTER \
  -n cluster-config --namespace config-infra -t managedClusters \
  --scope cluster -u https://github.com/gbaeke/crossplane \
  --branch main  \
  --kustomization name=infra path=./infra prune=true \
  --kustomization name=secrets path=./secrets prune=true dependsOn=["infra"] \
  --kustomization name=apps path=./crossplane-apps prune=true dependsOn=["secrets"]

This folder does not contain a kustomization.yaml file. Any manifest you drop in it will be applied to the cluster! The k8s-kustomization now has the same source but three kustomizations:

infra, secrets and apps kustomizations

After a while, an AKS cluster clu-cp should be deployed to resource group rg-crossplane:

AKS deployed by Crossplane running on another AKS cluster

To play around with this, I recommend using Visual Studio Code and the GitOps extension. When you make a change locally and push to main, to speed things up, you can reconcile the git repository and the apps kustomization manually:

Reconcile the GitRepository source and kustomization from the GitOps extension for Visual Studio Code

Conclusion

In this post, we looked at installing and configuring Crossplane on AKS via GitOps and the microsoft.flux extension. In addition, we deployed a few Azure resources with Crossplane and its jet provider for Azure. We only scratched the surface here but I hope this gets you started quickly when evaluating Crossplane for yourself.

%d bloggers like this: