Home

Published

- 5 min read

Gateway API for Trino Part 5

img of Gateway API for Trino Part 5

In this article we’ll chat about the results of our experiment. We hypothesized that the Kubernetes Gateway API would maintain session persistence during route mutations due to its atomic update model while the Nginx Ingress Reloads would sever TCp connections, leading to query failures

Experiment Design

We developed a python script to simulate a flapping infra while executing complex SQL on a Trino Cluster

Asynchronous Trino queries via threading to simulate active user sessions We used subprocess to flap a legacy dummyingress.yaml and the NetworkingV1Api to delete it mid-query We used CustomObjectsAPI to perform JSON patches on httproutes, which allowed us to add and remove backendRefs dynamically to test delta-updates

import trino
import time
import threading


from kubernetes import client, config
import requests
import statistics
from trino.dbapi import connect
import kubernetes.client
import subprocess
import time


from kubernetes.client.rest import ApiException


config = config.load_kube_config()
custom_api = client.CustomObjectsApi()
networking = client.NetworkingV1Api(config)




def configureConnection(host):


   conn = connect(
       host=host,
       port="80",
       user="admin",
       catalog="tpch",
       schema="sf1",
       request_timeout=60.0
   )
   cur = conn.cursor()
   return cur


def executeQuery(host,i):
   start_time = time.time()
   try:
       query = """
       SELECT
       nation,
       o_year,
       sum(amount) AS sum_profit
   FROM
       (
           SELECT
               n_name AS nation,
               extract(YEAR FROM o_orderdate) AS o_year,
               l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity AS amount
           FROM
               part,
               supplier,
               lineitem,
               partsupp,
               orders,
               nation
           WHERE
               s_suppkey = l_suppkey
               AND ps_suppkey = l_suppkey
               AND ps_partkey = l_partkey
               AND p_partkey = l_partkey
               AND o_orderkey = l_orderkey
               AND s_nationkey = n_nationkey
               AND p_name LIKE '%green%'
       ) AS profit
   GROUP BY
       nation,
       o_year
   ORDER BY
       nation,
       o_year DESC;
       """
       cursor = configureConnection(host)
       cursor.execute(query)
       rows = cursor.fetchall()
       end_time = time.time()
       print(f"SUCCESS: In-flight query: {i} survived! (Took  {end_time - start_time:.2f}s)")
       test_results.append(True)
   except Exception as e:
       fail_time = time.time()
       print(f"FAILURE: Query:{i} severed at {fail_time - start_time:.2f}s. Error: {e}")
       test_results.append(False)
  
def delete_ingress():


   try:


       api_response = networking.delete_namespaced_ingress(
           name="minimal-ingress",
           namespace="default",
           body=client.V1DeleteOptions(
               propagation_policy='Foreground',
               grace_period_seconds=5
           )
       )
       print("deleting ingress")
       return api_response
  
   except Exception as e:
       pass


def create_Ingress():


   bash_code = """
       kubectl apply -f dummyingress.yaml
   """
   subprocess.run(bash_code, shell=True)




def add_route():
## finish this portion of this
   #api_instance = kubernetes.client.CustomObjectsApi(config)
   group = "gateway.networking.k8s.io"
   version = "v1"
   plural = "httproutes"
   name = 'trino-api-route'
   namespace= "gateway-system"
   while True:
       try:
           route = custom_api.get_namespaced_custom_object(
               group, version, namespace, plural, name
           )


           backends = route['spec']['rules'][0].get('backendRefs', [])
  
      
           if not any(backend.get('name') == 'trino-worker-service' for backend in backends):


               backends.append({"name": "trino-worker-service", "port": 8081, "kind": "Service"})
      
   #dry_run = ''
   #field_manager = ''
   #field_validation = ''
   #force = True
  
               api_response = custom_api.patch_namespaced_custom_object(group,version, namespace,plural, name,body=route)
          
           #print(api_response)
               return api_response
           else:
               print("Backend still exists meaning the delete didn't work")
       except ApiException as e:
           if  e.status == 409:
          
               print(f"Exception when calling Doing Add operation CustomObjectsApi: {e}")
               time.sleep(10)
           else: raise e


def delete_route():
## finish this portion of this
   #api_instance = kubernetes.client.CustomObjectsApi(config)
   group = "gateway.networking.k8s.io"
   version = "v1"
   plural = "httproutes"
   name = 'trino-api-route'
   namespace= "gateway-system"
   while True:
       try:
           current_route = custom_api.get_namespaced_custom_object(
               group, version, namespace, plural, name
           )
  
           current_list = current_route['spec']['rules'][0].get('backendRefs',[])
           new_list = []
           for backend in current_list:
               if backend.get('name') != 'trino-worker-service':
                   new_list.append(backend)
  
           current_route['spec']['rules'][0]['backendRefs'] = new_list
  
           body = current_route
   #dry_run = ''
   #field_manager = ''
   #field_validation = ''
   #force = True
  
           api_response = custom_api.patch_namespaced_custom_object(group,version, namespace,plural, name,body)
          
           return api_response
       except ApiException as e:
           if e.status == 409:
               #retry_count += 1
               print(f"Exception when doing delete operation: {e}")
               time.sleep(10)
           else:
               print(f"Exception when doing delete operation: {e}")
      


def Ingress_Stability():
   #Point the query at the Ingress endpoint
   global test_results
   test_results = []
   for i in range(10):


       create_Ingress()
       time.sleep(30)


       thread = threading.Thread(target=executeQuery, args=("trino-legacy.example.com", i))
       thread.start()


       time.sleep(30)
      
      
       print(f"Starting Flap iteration: {i+1}")
       delete_ingress()
      
       thread.join()
   success = test_results.count(True)
   print(f"final results -  Success:{success}/{len(test_results)}")


def Gateway_Stability():
   #Point the query at the Gateway API endpoint
   global test_results
   test_results = []
   for i in range(10):


       add_route()
       time.sleep(30)
       thread = threading.Thread(target=executeQuery, args=("trino.example.com", i))
       thread.start()


       time.sleep(30)


       print(f"Starting Flap iteration: {i+1}")
       #add_route()
       delete_route()
      
       thread.join()
   success = test_results.count(True)
   print(f"final results -  Success:{success}/{len(test_results)}")


def main():


   print("--- Starting Gateway Stability Test ---")
   Gateway_Stability()


   print("\n--- Starting Ingress Stability Test ---")
   Ingress_Stability()




if __name__ == "__main__":
   main()

Results of our experiment

Let’s talk about what we saw during our experiment.

Data

K8s networking Object

Ingress (Nginx) P99 latency spikes during reload Grew linearly and reparsing took longer as we added routes Worker’s weren’t updated atomically

Gateway API Zero-downtime Updates were incremental and only the delta was sent so the config size of the httproute didn’t grow linearly Atomic updates across the fleet

Interpreting the Results

Our data shows that the scalability of Ingress or lack thereof is liability for high performant workloads running on kubernetes. A simple routing change for one service shouldn’t cause a P99 spike for another service. This disrupts the user experience severely for the end users who depend on that service.

Our data also shows that Gateway API allows us to decouple the growth of our infra from the latency of our traffic by doing incremental updates. This helps in situations where we’re doing real time analytics that rely on high performant writes. A 200ms reload spike due to us updating a route would be unacceptable in this situation.

Conclusion

Our data shows that the lack of scalability of Ingress is a liability for high-performance workloads. By decoupling the growth of the infra from the latency of our traffic we don’t cause a P99 spike for your query engine.