Skip to content

Commit db1f1ef

Browse files
committed
wip
1 parent 99be618 commit db1f1ef

15 files changed

+176
-42
lines changed

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1831,17 +1831,19 @@ public CompactionManager.AllSSTableOpStatus verify(IVerifier.Options options) th
18311831
/**
18321832
* Rewrites all SSTables according to specified parameters
18331833
*
1834-
* @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link SSTableFormat#getLatestVersion()})
1834+
* @param skipIfCurrentVersion - if {@link true}, will rewrite only SSTables that have version older than the current one ({@link SSTableFormat#getLatestVersion()})
18351835
* @param skipIfNewerThanTimestamp - max timestamp (local creation time) for SSTable; SSTables created _after_ this timestamp will be excluded from compaction
18361836
* @param skipIfCompressionMatches - if {@link true}, will rewrite only SSTables whose compression parameters are different from {@code TableMetadata#params#getCompressionParameters()}
1837-
* @param jobs number of jobs for parallel execution
1837+
* @param latestColumnsOnly
1838+
* @param jobs number of jobs for parallel execution
18381839
*/
18391840
public CompactionManager.AllSSTableOpStatus sstablesRewrite(final boolean skipIfCurrentVersion,
18401841
final long skipIfNewerThanTimestamp,
18411842
final boolean skipIfCompressionMatches,
1842-
final int jobs) throws ExecutionException, InterruptedException
1843+
final int jobs,
1844+
boolean latestColumnsOnly) throws ExecutionException, InterruptedException
18431845
{
1844-
return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs);
1846+
return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, skipIfCurrentVersion, skipIfNewerThanTimestamp, skipIfCompressionMatches, jobs, latestColumnsOnly);
18451847
}
18461848

18471849
public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) throws ExecutionException, InterruptedException

src/java/org/apache/cassandra/db/SerializationHeader.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static SerializationHeader makeWithoutStats(TableMetadata metadata)
7676
return new SerializationHeader(true, metadata, metadata.regularAndStaticColumns(), EncodingStats.NO_STATS);
7777
}
7878

79-
public static SerializationHeader make(TableMetadata metadata, Collection<SSTableReader> sstables)
79+
public static SerializationHeader make(TableMetadata metadata, Collection<SSTableReader> sstables, boolean latestColumnsOnly)
8080
{
8181
// The serialization header has to be computed before the start of compaction (since it's used to write)
8282
// the result. This means that when compacting multiple sources, we won't have perfectly accurate stats
@@ -91,16 +91,33 @@ public static SerializationHeader make(TableMetadata metadata, Collection<SSTabl
9191
EncodingStats.Collector stats = new EncodingStats.Collector();
9292
RegularAndStaticColumns.Builder columns = RegularAndStaticColumns.builder();
9393
// We need to order the SSTables by descending generation to be sure that we use latest column metadata.
94-
for (SSTableReader sstable : orderByDescendingGeneration(sstables))
94+
95+
Collection<SSTableReader> ssTableReaders = orderByDescendingGeneration(sstables);
96+
97+
for (SSTableReader sstable : ssTableReaders)
9598
{
9699
stats.updateTimestamp(sstable.getMinTimestamp());
97100
stats.updateLocalDeletionTime(sstable.getMinLocalDeletionTime());
98101
stats.updateTTL(sstable.getMinTTL());
99-
columns.addAll(sstable.header.columns());
102+
}
103+
104+
if (latestColumnsOnly)
105+
{
106+
columns.addAll(metadata.regularAndStaticColumns());
107+
}
108+
else
109+
{
110+
for (SSTableReader sstable : ssTableReaders)
111+
columns.addAll(sstable.header.columns());
100112
}
101113
return new SerializationHeader(true, metadata, columns.build(), stats.get());
102114
}
103115

116+
public static SerializationHeader make(TableMetadata metadata, Collection<SSTableReader> sstables)
117+
{
118+
return make(metadata, sstables, false);
119+
}
120+
104121
private static Collection<SSTableReader> orderByDescendingGeneration(Collection<SSTableReader> sstables)
105122
{
106123
if (sstables.size() < 2)

src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,12 @@ public void shutdown()
206206

207207
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final long gcBefore, long maxSSTableBytes)
208208
{
209-
return new CompactionTask(cfs, txn, gcBefore);
209+
return getCompactionTask(txn, gcBefore, maxSSTableBytes, false);
210+
}
211+
212+
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, final long gcBefore, long maxSSTableBytes, boolean latestColumnsOnly)
213+
{
214+
return new CompactionTask(cfs, txn, gcBefore, latestColumnsOnly);
210215
}
211216

212217
/**

src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,29 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
3535
protected ILifecycleTransaction transaction;
3636
protected boolean isUserDefined;
3737
protected OperationType compactionType;
38+
protected final boolean latestColumnsOnly;
3839

3940
/**
40-
* @param cfs
41+
* @param cfs column family of this compaction task
4142
* @param transaction the modifying managing the status of the sstables we're replacing
4243
*/
4344
public AbstractCompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction transaction)
45+
{
46+
this(cfs, transaction, false);
47+
}
48+
49+
/**
50+
* @param cfs column family of this compaction task
51+
* @param transaction the modifying managing the status of the sstables we're replacing
52+
* @param latestColumnsOnly true if compaction should produce SSTables without e.g. dropped columns in serialisation header
53+
*/
54+
public AbstractCompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction transaction, boolean latestColumnsOnly)
4455
{
4556
this.cfs = cfs;
4657
this.transaction = transaction;
4758
this.isUserDefined = false;
4859
this.compactionType = OperationType.COMPACTION;
60+
this.latestColumnsOnly = latestColumnsOnly;
4961

5062
try
5163
{

src/java/org/apache/cassandra/db/compaction/CompactionManager.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
import org.apache.cassandra.metrics.TableMetrics;
110110
import org.apache.cassandra.repair.NoSuchRepairSessionException;
111111
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
112+
import org.apache.cassandra.schema.DroppedColumn;
112113
import org.apache.cassandra.schema.Schema;
113114
import org.apache.cassandra.schema.TableMetadata;
114115
import org.apache.cassandra.service.ActiveRepairService;
@@ -689,9 +690,25 @@ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs,
689690
final boolean skipIfCurrentVersion,
690691
final long skipIfOlderThanTimestamp,
691692
final boolean skipIfCompressionMatches,
692-
int jobs) throws InterruptedException, ExecutionException
693+
int jobs,
694+
boolean latestColumnsOnly) throws InterruptedException, ExecutionException
693695
{
694696
return performSSTableRewrite(cfs, (sstable) -> {
697+
// Should we remove dropped columns from header?
698+
if (latestColumnsOnly)
699+
{
700+
logger.info("ONLY LATEST");
701+
for (DroppedColumn droppedColumn : cfs.metadata().droppedColumns.values())
702+
{
703+
logger.info("FOUND DROPPED " + droppedColumn.toString());
704+
if (sstable.header.columns().contains(droppedColumn.column))
705+
{
706+
logger.info("RETURNING TRUE!");
707+
return true;
708+
}
709+
}
710+
}
711+
695712
// Skip if descriptor version matches current version
696713
if (skipIfCurrentVersion && sstable.descriptor.version.equals(sstable.descriptor.getFormat().getLatestVersion()))
697714
return false;
@@ -708,15 +725,15 @@ public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs,
708725
return false;
709726

710727
return true;
711-
}, jobs);
728+
}, jobs, latestColumnsOnly);
712729
}
713730

714731
/**
715732
* Perform SSTable rewrite
716733
717734
* @param sstableFilter sstables for which predicate returns {@link false} will be excluded
718735
*/
719-
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate<SSTableReader> sstableFilter, int jobs) throws InterruptedException, ExecutionException
736+
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, Predicate<SSTableReader> sstableFilter, int jobs, boolean latestColumnsOnly) throws InterruptedException, ExecutionException
720737
{
721738
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
722739
{
@@ -741,7 +758,7 @@ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
741758
@Override
742759
public void execute(LifecycleTransaction txn)
743760
{
744-
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
761+
AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE, latestColumnsOnly);
745762
task.setUserDefined(true);
746763
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
747764
task.execute(active);

src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,14 +1036,14 @@ public long getMaxSSTableBytes()
10361036
return maxSSTableSizeBytes;
10371037
}
10381038

1039-
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes)
1039+
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes, boolean latestColumnsOnly)
10401040
{
10411041
maybeReloadDiskBoundaries();
10421042
readLock.lock();
10431043
try
10441044
{
10451045
validateForCompaction(txn.originals());
1046-
return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
1046+
return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes, latestColumnsOnly);
10471047
}
10481048
finally
10491049
{
@@ -1052,6 +1052,11 @@ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long g
10521052

10531053
}
10541054

1055+
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, long gcBefore, long maxSSTableBytes)
1056+
{
1057+
return getCompactionTask(txn, gcBefore, maxSSTableBytes, false);
1058+
}
1059+
10551060
private void validateForCompaction(Iterable<SSTableReader> input)
10561061
{
10571062
readLock.lock();

src/java/org/apache/cassandra/db/compaction/CompactionTask.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,15 @@ public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcB
8181
this(cfs, txn, gcBefore, false);
8282
}
8383

84-
public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals)
84+
public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean latestColumnsOnly)
8585
{
86-
super(cfs, txn);
86+
this(cfs, txn, gcBefore, false, latestColumnsOnly);
87+
}
88+
89+
90+
public CompactionTask(ColumnFamilyStore cfs, ILifecycleTransaction txn, long gcBefore, boolean keepOriginals, boolean latestColumnsOnly)
91+
{
92+
super(cfs, txn, latestColumnsOnly);
8793
this.gcBefore = gcBefore;
8894
this.keepOriginals = keepOriginals;
8995
}
@@ -350,7 +356,7 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
350356
ILifecycleTransaction transaction,
351357
Set<SSTableReader> nonExpiredSSTables)
352358
{
353-
return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel());
359+
return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel(), latestColumnsOnly);
354360
}
355361

356362
public static String updateCompactionHistory(TimeUUID taskId, String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map<String, String> compactionProperties)

src/java/org/apache/cassandra/db/compaction/Upgrader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Upgrader(ColumnFamilyStore cfs, LifecycleTransaction txn, OutputHandler o
6969
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
7070
}
7171

72-
private SSTableWriter createCompactionWriter(StatsMetadata metadata)
72+
private SSTableWriter createCompactionWriter(StatsMetadata metadata, boolean latestColumnsOnly)
7373
{
7474
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
7575
sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
@@ -82,22 +82,22 @@ private SSTableWriter createCompactionWriter(StatsMetadata metadata)
8282
.setTransientSSTable(metadata.isTransient)
8383
.setTableMetadataRef(cfs.metadata)
8484
.setMetadataCollector(sstableMetadataCollector)
85-
.setSerializationHeader(SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)))
85+
.setSerializationHeader(SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable), latestColumnsOnly))
8686
.addDefaultComponents(cfs.indexManager.listIndexGroups())
8787
.setSecondaryIndexGroups(cfs.indexManager.listIndexGroups())
8888
.setCompressionDictionaryManager(cfs.compressionDictionaryManager())
8989
.build(transaction, cfs);
9090
}
9191

92-
public void upgrade(boolean keepOriginals)
92+
public void upgrade(boolean keepOriginals, boolean latestColumnsOnly)
9393
{
9494
outputHandler.output("Upgrading " + sstable);
9595
long nowInSec = FBUtilities.nowInSeconds();
9696
try (SSTableRewriter writer = SSTableRewriter.construct(cfs, transaction, keepOriginals, CompactionTask.getMaxDataAge(transaction.originals()));
9797
AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
9898
CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, nextTimeUUID()))
9999
{
100-
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata()));
100+
writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata(), latestColumnsOnly));
101101
iter.setTargetDirectory(writer.currentWriter().getFilename());
102102
while (iter.hasNext())
103103
writer.append(iter.next());

src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
6262
protected final long minRepairedAt;
6363
protected final TimeUUID pendingRepair;
6464
protected final boolean isTransient;
65+
protected final boolean latestColumnsOnly;
6566

6667
protected final SSTableRewriter sstableWriter;
6768
protected final ILifecycleTransaction txn;
@@ -76,19 +77,31 @@ public CompactionAwareWriter(ColumnFamilyStore cfs,
7677
Set<SSTableReader> nonExpiredSSTables,
7778
boolean keepOriginals)
7879
{
79-
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true);
80+
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true, false);
8081
}
8182

8283
public CompactionAwareWriter(ColumnFamilyStore cfs,
8384
Directories directories,
8485
ILifecycleTransaction txn,
8586
Set<SSTableReader> nonExpiredSSTables,
8687
boolean keepOriginals,
87-
boolean earlyOpenAllowed)
88+
boolean latestColumnsOnly)
89+
{
90+
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, true, latestColumnsOnly);
91+
}
92+
93+
public CompactionAwareWriter(ColumnFamilyStore cfs,
94+
Directories directories,
95+
ILifecycleTransaction txn,
96+
Set<SSTableReader> nonExpiredSSTables,
97+
boolean keepOriginals,
98+
boolean earlyOpenAllowed,
99+
boolean latestColumnsOnly)
88100
{
89101
this.cfs = cfs;
90102
this.directories = directories;
91103
this.nonExpiredSSTables = nonExpiredSSTables;
104+
this.latestColumnsOnly = latestColumnsOnly;
92105
this.txn = txn;
93106

94107
estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
@@ -230,22 +243,27 @@ protected boolean maybeSwitchLocation(DecoratedKey key)
230243
protected void switchCompactionWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
231244
{
232245
currentDirectory = directory;
233-
sstableWriter.switchWriter(sstableWriter(directory, nextKey));
246+
sstableWriter.switchWriter(sstableWriter(directory, nextKey, latestColumnsOnly));
234247
}
235248

236-
protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
249+
protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey, boolean latestColumnsOnly)
237250
{
238251
Descriptor descriptor = cfs.newSSTableDescriptor(getDirectories().getLocationForDisk(directory));
239252
MetadataCollector collector = new MetadataCollector(txn.originals(), cfs.metadata().comparator)
240253
.sstableLevel(sstableLevel());
241-
SerializationHeader header = SerializationHeader.make(cfs.metadata(), nonExpiredSSTables);
254+
SerializationHeader header = SerializationHeader.make(cfs.metadata(), nonExpiredSSTables, latestColumnsOnly);
242255

243256
return newWriterBuilder(descriptor).setMetadataCollector(collector)
244257
.setSerializationHeader(header)
245258
.setKeyCount(sstableKeyCount())
246259
.build(txn, cfs);
247260
}
248261

262+
protected SSTableWriter sstableWriter(Directories.DataDirectory directory, DecoratedKey nextKey)
263+
{
264+
return sstableWriter(directory, nextKey, false);
265+
}
266+
249267
/**
250268
* Returns the level that should be used when creating sstables.
251269
*/

src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,12 @@ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, I
4444

4545
public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, ILifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean keepOriginals, int sstableLevel)
4646
{
47-
super(cfs, directories, txn, nonExpiredSSTables, keepOriginals);
47+
this(cfs, directories, txn, nonExpiredSSTables, keepOriginals, sstableLevel, false);
48+
}
49+
50+
public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, ILifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean keepOriginals, int sstableLevel, boolean latestColumnsOnly)
51+
{
52+
super(cfs, directories, txn, nonExpiredSSTables, keepOriginals, latestColumnsOnly);
4853
this.sstableLevel = sstableLevel;
4954
}
5055

0 commit comments

Comments
 (0)