Skip to content Skip to sidebar Skip to footer

Dask Dataframe: Can `set_index` Put A Single Index Into Multiple Partitions?

Empirically it seems that whenever you set_index on a Dask dataframe, Dask will always put rows with equal indexes into a single partition, even if it results in wildly imbalanced

Solution 1:

is it the case that a single index can never be in two different partitions?

No, it's certainly allowed. Dask will even intend for this to happen. However, because of a bug in set_index, all the data will still end up in one partition.

An extreme example (every row is the same value except one):

In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)

As you can see, Dask intends for the 0s to be split up between multiple partitions. Yet when the shuffle actually happens, all the 0s still end up in one partition:

In [7]: import dask
In [8]: dask.compute(s.to_delayed())  # easy way to see the partitions separately
Out[8]: 
([Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)

This is because the code deciding which output partition a row belongs doesn't consider duplicates in divisions. Treating divisions as a Series, it uses searchsorted with side="right", hence why all the data always ends up in the last partition.

I'll update this answer when the issue is fixed.

Solution 2:

Is it the case that a single index can never be in two different partitions?

IIUC, the answer for practical purposes is yes.

A dask dataframe will in general have multiple partitions and dask may or may not know about the index values associated with each partition (see Partitions). If dask does know which partition contains which index range, then this will be reflected in df.divisions output (if not, the result of this call will be None).

When running .set_index, dask will compute divisions and it seems that in determining the divisions it will require that divisions are sequential and unique (except for the last element). The relevant code is here.

So two potential follow-up questions: why not allow any non-sequential indexing, and as a specific case of the previous, why not allow duplicate indexes in partitions.

With regards to the first question: for smallish data it might be feasible to think about a design that allows non-sorted indexing, but you can imagine that a general non-sorted indexing won't scale well, since dask will need to store indexes for each partition somehow.

With regards to the second question: it seems that this should be possible, but it also seems that right now it's not implemented correctly. See the snippet below:

# use this to generate 10 indexed partitionsimport pandas as pd

for user inrange(10):
    
    df = pd.DataFrame({'user_col': [user//3]*100})
    df['user'] = df['user_col']
    df = df.set_index('user')
    df.index.name = 'user_index'
    
    df.to_parquet(f'test_{user}.parquet', index=True)


# now load them into a dask dataframeimport dask.dataframe as dd

ddf = dd.read_parquet('test_*.parquet')

# dask will know about the divisionsprint(ddf.known_divisions) # True# further evidenceprint(ddf.divisions) # (0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3)# this should show three partitions, but will show only oneprint(ddf.loc[0].npartitions) # 1

Solution 3:

I have just noticed that Dask's documentation for shuffle says

After this operation, rows with the same value of on will be in the same partition.

This seems to confirm my empirical observation.

Post a Comment for "Dask Dataframe: Can `set_index` Put A Single Index Into Multiple Partitions?"