Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility to provide cred #11

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open

Add possibility to provide cred #11

wants to merge 18 commits into from

Conversation

ncclementi
Copy link
Contributor

No description provided.

@ncclementi ncclementi changed the title Add possibility to provide cred via filename path Add possibility to provide cred via filename path (WIP) Sep 30, 2021
@ncclementi ncclementi changed the title Add possibility to provide cred via filename path (WIP) Add possibility to provide cred (WIP) Sep 30, 2021
@ncclementi
Copy link
Contributor Author

Passing the file name only works locally, but won't work if I want to run it using a cluster, since I won't have the filename will live on my laptop and when the workers go over the graph and look for that filename it won't be available.

I attempted to send the credentials directly (see 7b874e8) but it seems we can't pickle the credentials object.

I wonder if there is another way to be able to send the credentials. @tswast any suggestions on how to approach this?

Copy link

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd discourage the use of service account keys (Google is recommending https://cloud.google.com/iam/docs/workload-identity-federation where possible), though obviously the service account key file flow should be supported. Left a couple of comments pointing at other kinds of credentials you'll want to consider.

One thing I might recommend is using the pydata-google-auth package. https://pydata-google-auth.readthedocs.io/en/latest/intro.html#default-credentials That's what we do in pandas-gbq to fall back to browser-based end user credentials.

@@ -53,6 +57,7 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs):
def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
credentials: Credentials,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally you'd allow a wider set of credentials than this. google.auth.credentials.Credentials is going to be the most flexible.

There are two types of credentials that come to mind that you'll want to support that aren't service account credentials:

  • External Account Credentials -- These are useful for folks running on other clouds.
  • OAuth 2.0 Credentials -- Also known as end-user credentials. These are useful for folks running locally. It's the "open a browser window and use your Google account" flow.

@@ -89,6 +94,8 @@ def read_gbq(
dataset_id: str,
table_id: str,
row_filter="",
*,
cred_fpath: str = None,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A credentials object is going to be the most flexible. Not everyone has access to the local file system, such as if they're running from a Colab notebook.

@tswast
Copy link

tswast commented Sep 30, 2021

Regarding CLIENT_ID and CLIENT_SECRETS from pandas-gbq, technically you should create some to represent Dask/Dask-BigQuery, but it does take some red tape. (See: ibis-project/ibis#2179 for some of the struggles I had with a similar request with Ibis.)

I'm not sure how GCSFS identifies itself, but it might be possible to share some code with them? https://github.com/dask/gcsfs/blob/main/gcsfs/credentials.py

@tswast
Copy link

tswast commented Sep 30, 2021

Re: getting credentials to the worker "can't pickle the credentials object."

Technically, all that's needed is an access token. Once you have a credentials object and have refreshed it, you can construct a credentials object on the worker with just this access token.

A couple of caveats:

  • access token can be "short lived", meaning it will expire after a configurable amount of time (such as 1 hour).
  • some APIs (I'm not sure if BigQuery is among them) support JWT-based credentials where a token is generated locally with a key file instead of requesting an access token via the auth APIs. I'm not sure if that kind of credential would work with just an access token.

@tswast
Copy link

tswast commented Sep 30, 2021

There is a to_json function which you can use instead of pickling: https://googleapis.dev/python/google-auth/latest/reference/google.oauth2.credentials.html#google.oauth2.credentials.Credentials.to_json, but again it looks like only some credential types support this. Sorry this is so complicated!

Edit: Looks like https://github.com/dask/gcsfs/blob/main/gcsfs/credentials.py maintains their own GoogleCredentials class, which is pickleable. Looks like an opportunity for shared Google credentials package across all of Dask to me.

@quasiben
Copy link

quasiben commented Oct 5, 2021

In dask-cloudprovider we did the following for handling a number of gcp auth methods: https://github.com/dask/dask-cloudprovider/blob/69897be0a9a9cc1ee49e961048c94019f23db3fc/dask_cloudprovider/gcp/instances.py#L606-L638

@ncclementi
Copy link
Contributor Author

@quasiben Thanks for pointing that out I'll take a look. I've been trying multiple things to pass the credentials and currently, the main issue is that when we are trying to build the graph in here

layer = DataFrameIOLayer(
output_name,
meta.columns,
[stream.name for stream in session.streams],
partial(
bigquery_read,
make_create_read_session_request,
project_id,
credentials,
read_kwargs,
),
label=label,
)

It seems it can't pickle the credentials object <google.oauth2.service_account.Credentials at 0x1593b9610>. getting this traceback:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
   3802         with _cache_lock:
-> 3803             result = cache_dumps[func]
   3804     except KeyError:

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/utils.py in __getitem__(self, key)
   1369     def __getitem__(self, key):
-> 1370         value = super().__getitem__(key)
   1371         self.data.move_to_end(key)

~/mambaforge/envs/test_gbq/lib/python3.8/collections/__init__.py in __getitem__(self, key)
   1009             return self.__class__.__missing__(self, key)
-> 1010         raise KeyError(key)
   1011     def __setitem__(self, key, item): self.data[key] = item

KeyError: subgraph_callable-e1125c9b-d48d-4891-8fbc-22331e8422ad

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     48         buffers.clear()
---> 49         result = pickle.dumps(x, **dump_kwargs)
     50         if len(result) < 1000:

AttributeError: Can't pickle local object 'read_gbq.<locals>.make_create_read_session_request'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
/var/folders/1y/ydztfpnd11b6qmvbb8_x56jh0000gn/T/ipykernel_57662/2803347007.py in <module>
----> 1 ddf_ts.map_partitions(len).compute()

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    284         dask.base.compute
    285         """
--> 286         (result,) = compute(self, traverse=False, **kwargs)
    287         return result
    288 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2649         Client.compute : Compute asynchronous collections
   2650         """
-> 2651         futures = self._graph_to_futures(
   2652             dsk,
   2653             keys=set(flatten([keys])),

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2574             # Pack the high level graph before sending it to the scheduler
   2575             keyset = set(keys)
-> 2576             dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
   2577 
   2578             # Create futures before sending graph (helps avoid contention)

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys, annotations)
   1053                     "__module__": layer.__module__,
   1054                     "__name__": type(layer).__name__,
-> 1055                     "state": layer.__dask_distributed_pack__(
   1056                         self.get_all_external_keys(),
   1057                         self.key_dependencies,

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/blockwise.py in __dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
    552             to_serialize(dsk[0])
    553             if (self.concatenate or inline_tasks)
--> 554             else dumps_function(dsk[0])
    555         )
    556         func_future_args = dsk[1:]

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
   3803             result = cache_dumps[func]
   3804     except KeyError:
-> 3805         result = pickle.dumps(func, protocol=4)
   3806         if len(result) < 100000:
   3807             with _cache_lock:

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     58         try:
     59             buffers.clear()
---> 60             result = cloudpickle.dumps(x, **dump_kwargs)
     61         except Exception as e:
     62             logger.info("Failed to serialize %s. Exception: %s", x, e)

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
     71                 file, protocol=protocol, buffer_callback=buffer_callback
     72             )
---> 73             cp.dump(obj)
     74             return file.getvalue()
     75 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    600     def dump(self, obj):
    601         try:
--> 602             return Pickler.dump(self, obj)
    603         except RuntimeError as e:
    604             if "recursion" in e.args[0]:

TypeError: cannot pickle '_cffi_backend.FFI' object

@ncclementi ncclementi changed the title Add possibility to provide cred (WIP) Add possibility to provide cred Oct 5, 2021
@tswast
Copy link

tswast commented Oct 7, 2021

Good point re: pickling. Seems like gcsfs worked around this by defining their own pickleable class. https://github.com/dask/gcsfs/blob/main/gcsfs/credentials.py#L40

I'll chat with our core library Python devs to see if it's feasible to pickle Credentials objects or if they recommend another way of cross-process sharing.

@tswast
Copy link

tswast commented Oct 7, 2021

Looks like the pickling error is actually happening on the closure function:

AttributeError: Can't pickle local object 'read_gbq..make_create_read_session_request'

Though I don't think proto-plus objects are picklable, either. If you can pass the result of make_create_read_session_request instead of the actual function, maybe that would help?

@tswast
Copy link

tswast commented Oct 7, 2021

Re: "proto-plus objects aren't picklable"

I was referring to the CreateReadSessionRequest object.

For that, you'll want to manually call

https://proto-plus-python.readthedocs.io/en/latest/reference/message.html#proto.message.Message.serialize

and

https://proto-plus-python.readthedocs.io/en/latest/reference/message.html#proto.message.Message.deserialize

to get to/from the bytes for that protocol buffer.

@ncclementi
Copy link
Contributor Author

Thanks for the comments @tswast I'll take a look into these options.
We found a temporary workaround by pickling the dictionary you get from the json file, but this is not ideal. I've been exploring a bit trying to see if we can use tokens, I've just landed into pyjwt it seems it might do the work, but haven't tried it yet.

@tswast
Copy link

tswast commented Oct 7, 2021

My teammates did some investigation and verified that Credentials are not picklable due to use of cryptography library.

Objects are not guaranteed to be pickleable, and pickled objects from one version of cryptography may not be loadable in future versions.

https://cryptography.io/en/latest/api-stability/?highlight=pickle#what-doesn-t-this-policy-cover

The object is picklable if cryptography isn't installed and google-auth uses the pure-Python rsa library, but that's also the slow path so I wouldn't recommend that.

I'm still investigating other options.

@ncclementi
Copy link
Contributor Author

I gave pyjwt so right now we are passing a token to the worker and decoding it in the workers, instead of the dictionary with the information, but this is still not great. Will keep looking into solutions too.

@fjetter fjetter added the enhancement New feature or request label Oct 8, 2021
@ncclementi
Copy link
Contributor Author

@tswast In a previous comment, you mention the possibility of using access tokens. Could you expand on this? I wonder if we are missing something.

Currently, the best solution we have is creating a token from the content of the key jason file with pyjwt and "unloading" the token on the workers. But this is not ideal.

Do you know of any access token that we could create with google-auth or similar that we can pass to the workers and authenticate directly without having to go through an "unload" process?

@bnaul
Copy link
Contributor

bnaul commented Oct 14, 2021

@ncclementi just curious, what's the environment that you're trying to get auth working in? for our GKE setup (client+scheduler+workers all on different GKE k8s pods) everything just works out of the box using the default credentials that are inferred from the GCP VM environment. I can definitely imagine cases where only the client has the auth info but it seems like maybe something of an edge case...?

@tswast
Copy link

tswast commented Oct 14, 2021

In a previous comment, you mention the possibility of using access tokens. Could you expand on this? I wonder if we are missing something.

There's a few dozen ways to get an access token. The auth library calls it a "bearer token" https://googleapis.dev/python/google-auth/latest/reference/google.auth.credentials.html#google.auth.credentials.Credentials.token

For a few of the common ways, I've got some slides and code samples here from an old presentation: https://github.com/tswast/code-snippets/blob/main/2019/python-auth-samples/slides.pdf

@tswast
Copy link

tswast commented Oct 14, 2021

I can definitely imagine cases where only the client has the auth info but it seems like maybe something of an edge case...?

Non-GCP isn't exactly an edge case. I could imagine folks not wanting to distribute key files to all the worker nodes.

Edit: But it might make sense to allow for a simpler code path (no passing of keys/access token) when running on GCP or with an environment where the workers are configured for Google Application Default Credentials.

@ncclementi
Copy link
Contributor Author

@bnaul I was trying to use it with Coiled, where the client is on my laptop but the cluster is on the cloud (in particular I had an AWS instance).

@tswast
Copy link

tswast commented Oct 29, 2021

@ncclementi You're almost there! Just need to construct one of these: https://googleapis.dev/python/google-auth/latest/reference/google.oauth2.credentials.html#google.oauth2.credentials.Credentials

import google.auth.transport.requests

auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
cred_token = credentials.token

import google.oauth2.credentials

worker_credentials = google.oauth2.credentials.Credentials(cred_token)

from google.cloud import bigquery

bq_client = bigquery.Client("swast-scratch", credentials=worker_credentials)

# It works!
print(list(bq_client.query("SELECT COUNT(*) FROM `bigquery-public-data.usa_names.usa_1910_current`").result()))

@ncclementi
Copy link
Contributor Author

Thanks @tswast this is promising. I will give this a try in the dask-bigquery code.

@ncclementi
Copy link
Contributor Author

I just took this approach for a spin using Coiled with an AWS backend, and it works. 🎉

@ncclementi
Copy link
Contributor Author

@tswast and @bnaul It seems we found a way to pass the credentials to the workers in a safe way. Now the question is whether we want this to be the default behavior or not. I understand there are many cases where the workers have access to credentials without the need of explicitly passing (like Brett's workflow explained here, in which case having to pass the credentials is not desirable.

Tim, you mention that a code path would be a possible way to go. (see #11 (comment))

In a conversation with @jrbourbeau we were thinking something like a fwd_creds flag, which by default is False (current behavior), and having the possibility that if fwd_creds=True then the behavior would be this PR.

What do you think?

@tswast
Copy link

tswast commented Nov 2, 2021

In a conversation with @jrbourbeau we were thinking something like a fwd_creds flag, which by default is False (current behavior), and having the possibility that if fwd_creds=True then the behavior would be this PR.

This sounds reasonable to me. I definitely want to avoid having to create a service account key file when running on GCP.

@ncclementi
Copy link
Contributor Author

I added a fwd_creds flag to have the option to forward credentials if desired. I added a parameterized option to the tests to check this path. However, the test path for fwd_creds=True is not entirely isolated, as currently the way the tests are designed the client and workers have access to the credentials.

@bnaul @tswast if you have an idea on how to test this path better, let us know.

if creds_path is None:
raise ValueError("No credentials found")

credentials = service_account.Credentials.from_service_account_file(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to call

https://googleapis.dev/python/google-auth/latest/reference/google.auth.html#google.auth.default

instead. That'll find the GOOGLE_APPLICATIONS_CREDENTIALS environment variable if available, but also allow for other authentication methods such as gcloud auth application-default login.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe any kind of credentials it returns should have a refresh function, just like the service account credentials.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Tim, I will take a look at this.

@tswast
Copy link

tswast commented Nov 4, 2021

@bnaul @tswast if you have an idea on how to test this path better, let us know.

It's a bit tough to test from an environment where credentials are available. I tend to resort to some mocks https://github.com/pydata/pydata-google-auth/blob/b23bc5e83ba26a19972d0610bd4865dbcac91d30/tests/unit/test_auth.py#L34

In a similar idea, you might try monkeypatching the workers to set os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "" when testing fwd_creds=True. If I remember correctly, that'll cause the default auth paths to fail fast.

@ncclementi
Copy link
Contributor Author

It's a bit tough to test from an environment where credentials are available. I tend to resort to some mocks https://github.com/pydata/pydata-google-auth/blob/b23bc5e83ba26a19972d0610bd4865dbcac91d30/tests/unit/test_auth.py#L34

In a similar idea, you might try monkeypatching the workers to set os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "" when testing fwd_creds=True. If I remember correctly, that'll cause the default auth paths to fail fast.

@tswast We thought of this, but I'm not sure it'll work given that we read the credentials initially from that environment variable,

if fwd_creds:
creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")

If I set that GOOGLE_APPLICATION_CREDENTIALS"= "" then how can I test that I can authenticate forwarding credentials?

@tswast
Copy link

tswast commented Nov 17, 2021

@ncclementi Do we have a way of injecting code into the worker processes when they start-up in your test setup? Monkey patching google.auth.default to throw google.auth.exceptions.DefaultCredentialsError would simulate an environment with no credentials.

@tswast
Copy link

tswast commented Nov 17, 2021

Or are the workers in-process threads? In which case, maybe patch google.auth.default to work correctly once and then throw after that?

@ncclementi
Copy link
Contributor Author

I modify core.py to use google.auth.default and I added a test to check that when we have no credentials we can't authenticate using monkeypatch but I'm still not sure how can I use monkeypatch to authenticate on the client, forward the credentials but not have them accessible to the workers. Meaning how to do this

    def mock_auth(scopes=["https://www.googleapis.com/auth/bigquery.readonly"]):
        raise google.auth.exceptions.DefaultCredentialsError()

    monkeypatch.setattr(google.auth, "default", mock_auth)

only on the scheduler/workers but not on the client.

@jrbourbeau Do you have any ideas?

@ncclementi
Copy link
Contributor Author

ncclementi commented Nov 18, 2021

It also looks like the most recent update on pandas-gbq might have broken our tests. When writing to bigquery this

pd.DataFrame.to_gbq(
        df,
        destination_table=f"{dataset_id}.{table_id}",
        project_id=project_id,
        chunksize=5,
        if_exists="append",
    )

with pandas-gbq=0.15 and reading it back with dask_bigquery.read_gbqreturns 2 dask partitions, while if the writing is done withpandas-gbq=0.16when reading back withdask_bigquery.read_gbq` returns only 1 dask partitions.

@tswast do you know what might be happening here, and how should we adapt the tests to represent what we had before with the chunking?

@bnaul
Copy link
Contributor

bnaul commented Nov 18, 2021

when reading back with dask_bigquery.read_gbq returns only 1 dask partitions

I think the # of partitions should always be treated as random/arbitrary, BigQuery is always free to change their internal chunking heuristics and break everything if we rely on any specific partitioning

also do we need to still be using pandas-gbq or should we just use the built-in bigquery.Client.load_table_from_dataframe ?

@tswast
Copy link

tswast commented Nov 18, 2021

pandas-gbq 0.16 changed the default intermediate data serialization format to parquet instead of CSV.

Likely this means the backend loader required fewer workers and wrote it to fewer files behind the scenes

@tswast
Copy link

tswast commented Nov 18, 2021

You can pass api_method="load_csv" for the old behavior, but we might want to look into manually splitting streams like folks are doing in the Apache Beam BigQuery connector

@ncclementi
Copy link
Contributor Author

also do we need to still be using pandas-gbq or should we just use the built-in bigquery.Client.load_table_from_dataframe ?

We are only using it for testing purposes. I guess the answer is no, it was a simple workaround for testing purposes but it's probably a good idea to not rely on it and maybe just use simple bigquery. I wonder how can we recreate the simple example that when reading back with dask_bigquery.read_gbq will get us more than 1 partition that we can assert for.

Is there a way to control the partitions we read, like if we have a bigquery partitioned table that has N partitions, when reading the table with dask_bigquery will I get N partitions? (trying to test for this case)

@ncclementi
Copy link
Contributor Author

Status update

  • Up to this moment, we have not found a way to create a test that isolates the Client (having credentials) from the Scheduler/workers (not having credentials), which is a crucial test for this implementation.
  • We will pause merging this to give people more time to use the package as is and see if there is a need for "forwarding credentials". If that is the case we will get back to it, and try to figure out such tests.

cc: @jrbourbeau

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants