Dask Dataframe: Can `set_index` Put A Single Index Into Multiple Partitions?
Solution 1:
is it the case that a single index can never be in two different partitions?
No, it's certainly allowed. Dask will even intend for this to happen. However, because of a bug in set_index
, all the data will still end up in one partition.
An extreme example (every row is the same value except one):
In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)
As you can see, Dask intends for the 0
s to be split up between multiple partitions. Yet when the shuffle actually happens, all the 0
s still end up in one partition:
In [7]: import dask
In [8]: dask.compute(s.to_delayed()) # easy way to see the partitions separately
Out[8]:
([Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [],
Empty DataFrame
Columns: []
Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)
This is because the code deciding which output partition a row belongs doesn't consider duplicates in divisions
. Treating divisions
as a Series, it uses searchsorted
with side="right"
, hence why all the data always ends up in the last partition.
I'll update this answer when the issue is fixed.
Solution 2:
Is it the case that a single index can never be in two different partitions?
IIUC, the answer for practical purposes is yes.
A dask dataframe will in general have multiple partitions and dask may or may not know about the index values associated with each partition (see Partitions
). If dask does know which partition contains which index range, then this will be reflected in df.divisions
output (if not, the result of this call will be None
).
When running .set_index
, dask will compute divisions and it seems that in determining the divisions it will require that divisions are sequential and unique (except for the last element). The relevant code is here.
So two potential follow-up questions: why not allow any non-sequential indexing, and as a specific case of the previous, why not allow duplicate indexes in partitions.
With regards to the first question: for smallish data it might be feasible to think about a design that allows non-sorted indexing, but you can imagine that a general non-sorted indexing won't scale well, since dask will need to store indexes for each partition somehow.
With regards to the second question: it seems that this should be possible, but it also seems that right now it's not implemented correctly. See the snippet below:
# use this to generate 10 indexed partitionsimport pandas as pd
for user inrange(10):
df = pd.DataFrame({'user_col': [user//3]*100})
df['user'] = df['user_col']
df = df.set_index('user')
df.index.name = 'user_index'
df.to_parquet(f'test_{user}.parquet', index=True)
# now load them into a dask dataframeimport dask.dataframe as dd
ddf = dd.read_parquet('test_*.parquet')
# dask will know about the divisionsprint(ddf.known_divisions) # True# further evidenceprint(ddf.divisions) # (0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3)# this should show three partitions, but will show only oneprint(ddf.loc[0].npartitions) # 1
Solution 3:
I have just noticed that Dask's documentation for shuffle says
After this operation, rows with the same value of on will be in the same partition.
This seems to confirm my empirical observation.
Post a Comment for "Dask Dataframe: Can `set_index` Put A Single Index Into Multiple Partitions?"