Connect the Pipelines SDK to Kubeflow Pipelines
How to connect Pipelines SDK to Kubeflow Pipelines will depend on what kind of Kubeflow deployment you have, and from where you are running your code.
- Full Kubeflow (from inside cluster)
 - Full Kubeflow (from outside cluster)
 - Standalone Kubeflow Pipelines (from inside cluster)
 - Standalone Kubeflow Pipelines (from outside cluster)
 
Full Kubeflow (from inside cluster)
Click to expand
When running the Pipelines SDK inside a multi-user Kubeflow cluster, a ServiceAccount token volume can be mounted to the Pod, the Kubeflow Pipelines SDK can use this token to authenticate itself with the Kubeflow Pipelines API.
The following code creates a kfp.Client() using a ServiceAccount token for authentication.
import kfp
# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow"
# the KF_PIPELINES_SA_TOKEN_PATH environment variable is used when no `path` is set
# the default KF_PIPELINES_SA_TOKEN_PATH is /var/run/secrets/kubeflow/pipelines/token
credentials = kfp.auth.ServiceAccountTokenVolumeCredentials(path=None)
client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}", credentials=credentials)
print(client.list_experiments())
The following Pod demonstrates mounting a ServiceAccount token volume.
apiVersion: v1
kind: Pod
metadata:
  name: access-kfp-example
spec:
  containers:
  - image: hello-world:latest
    name: hello-world
    env:
      - ## this environment variable is automatically read by `kfp.Client()`
        ## this is the default value, but we show it here for clarity
        name: KF_PIPELINES_SA_TOKEN_PATH
        value: /var/run/secrets/kubeflow/pipelines/token
    volumeMounts:
      - mountPath: /var/run/secrets/kubeflow/pipelines
        name: volume-kf-pipeline-token
        readOnly: true
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      
You may use Kubeflow’s PodDefaults to inject the required ServiceAccount token volume into your Pods:
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: access-ml-pipeline
  namespace: "<YOUR_USER_PROFILE_NAMESPACE>"
spec:
  desc: Allow access to Kubeflow Pipelines
  selector:
    matchLabels:
      access-ml-pipeline: "true"
  env:
    - ## this environment variable is automatically read by `kfp.Client()`
      ## this is the default value, but we show it here for clarity
      name: KF_PIPELINES_SA_TOKEN_PATH
      value: /var/run/secrets/kubeflow/pipelines/token
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      
  volumeMounts:
    - mountPath: /var/run/secrets/kubeflow/pipelines
      name: volume-kf-pipeline-token
      readOnly: true
Tip
PodDefaultsare namespaced resources, so you need to create one inside each of your KubeflowProfilenamespaces.- The Notebook Spawner UI will be aware of any 
PodDefaultsin the user’s namespace (they are selectable under the “configurations” section). 
RBAC Authorization
The Kubeflow Pipelines API respects Kubernetes RBAC, and will check RoleBindings assigned to the ServiceAccount before allowing it to take Pipelines API actions.
For example, this RoleBinding allows Pods with the default-editor ServiceAccount in namespace-2 to manage Kubeflow Pipelines in namespace-1:
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: allow-namespace-2-kubeflow-edit
  ## this RoleBinding is in `namespace-1`, because it grants access to `namespace-1`
  namespace: namespace-1
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: kubeflow-edit
subjects:
  - kind: ServiceAccount
    name: default-editor
    ## the ServiceAccount lives in `namespace-2`
    namespace: namespace-2
Tip
- Review the ClusterRole called 
aggregate-to-kubeflow-pipelines-editfor a list of some importantpipelines.kubeflow.orgRBAC verbs. - Kubeflow Notebooks pods run as the 
default-editorServiceAccount by default, so the RoleBindings fordefault-editorapply to them and give them access to submit pipelines in their own namespace. 
Full Kubeflow (from outside cluster)
Click to expand
The process to authenticate the Pipelines SDK from outside the cluster in multi-user mode will vary by distribution:
Example for Dex
For the deployments that use Dex as their identity provider, this example demonstrates how to authenticate the Pipelines SDK from outside the cluster.
Step 1: expose your istio-ingressgateway service locally (if your Kubeflow Istio gateway is not already exposed on a domain)
# `svc/istio-ingressgateway` may be called something else, or use different ports
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80
Step 2: this Python code defines a get_istio_auth_session() function that returns a session cookie by authenticating with dex
import re
import requests
from urllib.parse import urlsplit
def get_istio_auth_session(url: str, username: str, password: str) -> dict:
    """
    Determine if the specified URL is secured by Dex and try to obtain a session cookie.
    WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported
             (we default default to using `staticPasswords` if both are enabled)
    :param url: Kubeflow server URL, including protocol
    :param username: Dex `staticPasswords` or `LDAP` username
    :param password: Dex `staticPasswords` or `LDAP` password
    :return: auth session information
    """
    # define the default return object
    auth_session = {
        "endpoint_url": url,    # KF endpoint URL
        "redirect_url": None,   # KF redirect URL, if applicable
        "dex_login_url": None,  # Dex login URL (for POST of credentials)
        "is_secured": None,     # True if KF endpoint is secured
        "session_cookie": None  # Resulting session cookies in the form "key1=value1; key2=value2"
    }
    # use a persistent session (for cookies)
    with requests.Session() as s:
        ################
        # Determine if Endpoint is Secured
        ################
        resp = s.get(url, allow_redirects=True)
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {url}"
            )
        auth_session["redirect_url"] = resp.url
        # if we were NOT redirected, then the endpoint is UNSECURED
        if len(resp.history) == 0:
            auth_session["is_secured"] = False
            return auth_session
        else:
            auth_session["is_secured"] = True
        ################
        # Get Dex Login URL
        ################
        redirect_url_obj = urlsplit(auth_session["redirect_url"])
        # if we are at `/auth?=xxxx` path, we need to select an auth type
        if re.search(r"/auth$", redirect_url_obj.path): 
            
            #######
            # TIP: choose the default auth type by including ONE of the following
            #######
            
            # OPTION 1: set "staticPasswords" as default auth type
            redirect_url_obj = redirect_url_obj._replace(
                path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path)
            )
            # OPTION 2: set "ldap" as default auth type 
            # redirect_url_obj = redirect_url_obj._replace(
            #     path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path)
            # )
            
        # if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST)
        if re.search(r"/auth/.*/login$", redirect_url_obj.path):
            auth_session["dex_login_url"] = redirect_url_obj.geturl()
        # else, we need to be redirected to the actual login page
        else:
            # this GET should redirect us to the `/auth/xxxx/login` path
            resp = s.get(redirect_url_obj.geturl(), allow_redirects=True)
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}"
                )
            # set the login url
            auth_session["dex_login_url"] = resp.url
        ################
        # Attempt Dex Login
        ################
        resp = s.post(
            auth_session["dex_login_url"],
            data={"login": username, "password": password},
            allow_redirects=True
        )
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials were probably invalid - "
                f"No redirect after POST to: {auth_session['dex_login_url']}"
            )
        # store the session cookies in a "key1=value1; key2=value2" string
        auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies])
    return auth_session
Step 3: this Python code uses the above get_istio_auth_session() function to create a kfp.Client()
import kfp
KUBEFLOW_ENDPOINT = "http://localhost:8080"
KUBEFLOW_USERNAME = "user@example.com"
KUBEFLOW_PASSWORD = "12341234"
auth_session = get_istio_auth_session(
    url=KUBEFLOW_ENDPOINT,
    username=KUBEFLOW_USERNAME,
    password=KUBEFLOW_PASSWORD
)
client = kfp.Client(host=f"{KUBEFLOW_ENDPOINT}/pipeline", cookies=auth_session["session_cookie"])
print(client.list_experiments())
Standalone Kubeflow Pipelines (from inside cluster)
Click to expand
Warning
This information only applies to Standalone Kubeflow Pipelines.When running inside the Kubernetes cluster, you may connect Pipelines SDK directly to the ml-pipeline-ui service via cluster-internal service DNS resolution.
Tip
In standalone deployments of Kubeflow Pipelines, there is no authentication enforced on theml-pipeline-ui service.For example, when running in the same namespace as Kubeflow:
import kfp
client = kfp.Client(host="http://ml-pipeline-ui:80")
print(client.list_experiments())
For example, when running in a different namespace to Kubeflow:
import kfp
# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow" 
client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}")
print(client.list_experiments())
Standalone Kubeflow Pipelines (from outside cluster)
Click to expand
Warning
This information only applies to Standalone Kubeflow Pipelines.When running outside the Kubernetes cluster, you may connect Pipelines SDK to the ml-pipeline-ui service by using kubectl port-forwarding.
Tip
In standalone deployments of Kubeflow Pipelines, there is no authentication enforced on theml-pipeline-ui service.Step 1: run the following command on your external system to initiate port-forwarding:
# change `--namespace` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward --namespace kubeflow svc/ml-pipeline-ui 3000:80
Step 2: the following code will create a kfp.Client() against your port-forwarded ml-pipeline-ui service:
import kfp
client = kfp.Client(host="http://localhost:3000")
print(client.list_experiments())
Next Steps
- Using the Kubeflow Pipelines SDK
 - Kubeflow Pipelines SDK Reference
 - Experiment with the Kubeflow Pipelines API
 
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.