Skip to content

Shared metadata. Based on actual main #17463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
14 changes: 14 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/data_accessor/node_actor.h>
#include <ydb/core/tx/mediator/mediator.h>
#include <ydb/core/tx/replication/controller/controller.h>
#include <ydb/core/tx/replication/service/service.h>
Expand Down Expand Up @@ -1114,6 +1115,19 @@ void TSharedCacheInitializer::InitializeServices(
TActorSetupCmd(actor, TMailboxType::ReadAsFilled, appData->UserPoolId));
}

// TSharedMetadaCacheInitializer
TSharedMetadaCacheInitializer::TSharedMetadaCacheInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig)
{}

void TSharedMetadaCacheInitializer::InitializeServices( NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) {
if (appData->FeatureFlags.GetEnableSharedMetadataCache()) {
auto* actor = NKikimr::NOlap::NDataAccessorControl::TNodeActor::CreateActor();
setup->LocalServices.emplace_back(NKikimr::NOlap::NDataAccessorControl::TNodeActor::MakeActorId(NodeId),
TActorSetupCmd(actor, TMailboxType::HTSwap, appData->UserPoolId));
}
}

// TBlobCacheInitializer

TBlobCacheInitializer::TBlobCacheInitializer(const TKikimrRunConfig& runConfig)
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ class TSharedCacheInitializer : public IKikimrServicesInitializer {
void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override;
};

class TSharedMetadaCacheInitializer : public IKikimrServicesInitializer {
public:
TSharedMetadaCacheInitializer(const TKikimrRunConfig& runConfig);

void InitializeServices(NActors::TActorSystemSetup *setup, const NKikimr::TAppData *appData) override;
};

class TBlobCacheInitializer : public IKikimrServicesInitializer {
public:
TBlobCacheInitializer(const TKikimrRunConfig& runConfig);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers

sil->AddServiceInitializer(new TMemProfMonitorInitializer(runConfig, ProcessMemoryInfoProvider));

if (serviceMask.EnableSharedMetadaCache) {
sil->AddServiceInitializer(new TSharedMetadaCacheInitializer(runConfig));
}

#if defined(ENABLE_MEMORY_TRACKING)
if (serviceMask.EnableMemoryTracker) {
sil->AddServiceInitializer(new TMemoryTrackerInitializer(runConfig));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/service_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ union TBasicKikimrServicesMask {
bool EnableGroupedMemoryLimiter:1;
bool EnableAwsService:1;
bool EnableCompPriorities : 1;
bool EnableSharedMetadaCache : 1;
};

struct {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,5 @@ message TFeatureFlags {
optional bool SwitchToConfigV1 = 180 [default = false];
optional bool EnableEncryptedExport = 181 [default = false];
optional bool EnableAlterDatabase = 182 [default = false];
optional bool EnableSharedMetadataCache = 183 [default = true];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableDatabaseAdmin)
FEATURE_FLAG_SETTER(EnablePermissionsExport)
FEATURE_FLAG_SETTER(EnableShowCreate)
FEATURE_FLAG_SETTER(EnableSharedMetadataCache)

#undef FEATURE_FLAG_SETTER
};
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
#include <ydb/library/services/services.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/data_accessor/node_actor.h>
#include <ydb/core/tx/coordinator/coordinator.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
Expand Down Expand Up @@ -1180,6 +1181,16 @@ namespace Tests {
const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NConveyor::TInsertServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
if (Settings->FeatureFlags.GetEnableSharedMetadataCache()) {
auto* actor = NKikimr::NOlap::NDataAccessorControl::TNodeActor::CreateActor();

const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::HTSwap, 0);
const auto serviceId = NKikimr::NOlap::NDataAccessorControl::TNodeActor::MakeActorId(Runtime->GetNodeId(nodeIdx));
Runtime->RegisterService(serviceId, aid, nodeIdx);
}
}

Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);

auto sysViewService = NSysView::CreateSysViewServiceForTests();
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "blobs_reader/actor.h"
#include "counters/aggregation/table_stats.h"
#include "data_accessor/actor.h"
#include "data_accessor/node_actor.h"
#include "data_accessor/manager.h"
#include "engines/column_engine_logs.h"
#include "engines/writer/buffer/actor.h"
Expand Down Expand Up @@ -124,7 +125,13 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId()));
BufferizationInsertionWriteActorId = ctx.Register(new NColumnShard::NWriting::TActor(TabletID(), SelfId()));
BufferizationPortionsWriteActorId = ctx.Register(new NOlap::NWritingPortions::TActor(TabletID(), SelfId()));
DataAccessorsControlActorId = ctx.Register(new NOlap::NDataAccessorControl::TActor(TabletID(), SelfId()));
// Change actor here
if (AppData(ctx)->FeatureFlags.GetEnableSharedMetadataCache()){
DataAccessorsControlActorId = NOlap::NDataAccessorControl::TNodeActor::MakeActorId(ctx.SelfID.NodeId());
} else {
DataAccessorsControlActorId = ctx.Register(new NOlap::NDataAccessorControl::TActor(TabletID(), SelfId()));
}

DataAccessorsManager = std::make_shared<NOlap::NDataAccessorControl::TActorAccessorsManager>(DataAccessorsControlActorId, SelfId()),

PrioritizationClientId = NPrioritiesQueue::TCompServiceOperator::RegisterClient();
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,22 @@ class TColumnPortionsAccumulator {
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessors;
std::shared_ptr<TResultAccumulator> Result;
const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
const NOlap::TTabletId TabletId;

public:
TColumnPortionsAccumulator(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit, const std::set<ui32>& originalColumnTags,
const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const NOlap::TTabletId tabletId)
: StoragesManager(storagesManager)
, ColumnTagsRequested(originalColumnTags)
, PortionsCountLimit(portionsCountLimit)
, DataAccessors(dataAccessorsManager)
, Result(result)
, VersionedIndex(vIndex) {
, VersionedIndex(vIndex)
, TabletId(tabletId)
{
}

class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask {
Expand Down Expand Up @@ -259,7 +263,7 @@ class TColumnPortionsAccumulator {
}
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
Portions.clear();
DataAccessors->AskData(request);
DataAccessors->AskData(TabletId, request);
}

void AddTask(const NOlap::TPortionInfo::TConstPtr& portion) {
Expand Down Expand Up @@ -306,7 +310,7 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());
TColumnPortionsAccumulator portionsPack(
StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID());

for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(GetMaxReadVersion())) {
Expand Down
25 changes: 15 additions & 10 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -852,21 +852,24 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa
std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
std::shared_ptr<TDataAccessorsSubscriberBase> Subscriber;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
const NOlap::TTabletId TabletId;

virtual void DoOnAllocationSuccess(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
Subscriber->SetResourcesGuard(guard);
Request->RegisterSubscriber(Subscriber);
DataAccessorsManager->AskData(Request);
DataAccessorsManager->AskData(TabletId, Request);
}

public:
TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriberBase>& subscriber,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
NOlap::TTabletId tabletId)
: TBase(0, memory, externalTaskId, context)
, Request(std::move(request))
, Subscriber(subscriber)
, DataAccessorsManager(dataAccessorsManager) {
, DataAccessorsManager(dataAccessorsManager)
, TabletId(tabletId){
}
};

Expand Down Expand Up @@ -912,7 +915,7 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
CompactTaskSubscription);
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier(),
CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID()));
}

class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRead {
Expand Down Expand Up @@ -981,7 +984,7 @@ void TColumnShard::SetupMetadata() {
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor,
std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i.GetRequest()->GetTaskId(), TTLTaskSubscription,
std::shared_ptr<NOlap::TDataAccessorsRequest>(i.GetRequest()),
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified()));
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID()));
}
}

Expand Down Expand Up @@ -1020,7 +1023,7 @@ bool TColumnShard::SetupTtl() {
request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) + memoryUsage;
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i->GetTaskIdentifier(), TTLTaskSubscription,
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID()));
}
return true;
}
Expand Down Expand Up @@ -1069,7 +1072,7 @@ void TColumnShard::SetupCleanupPortions() {

NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, changes->GetTaskIdentifier(), TTLTaskSubscription,
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), (NOlap::TTabletId)TabletID()));
}

void TColumnShard::SetupCleanupTables() {
Expand Down Expand Up @@ -1392,14 +1395,15 @@ class TAccessorsParsingTask: public NConveyor::ITask {
private:
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
std::vector<TPortionConstructorV2> Portions;
const NOlap::TTabletId TabletId;

virtual void DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override {
std::vector<NOlap::TPortionDataAccessor> accessors;
accessors.reserve(Portions.size());
for (auto&& i : Portions) {
accessors.emplace_back(i.BuildAccessor());
}
FetchCallback->OnAccessorsFetched(std::move(accessors));
FetchCallback->OnAccessorsFetched(TabletId, std::move(accessors));
}
virtual void DoOnCannotExecute(const TString& reason) override {
AFL_VERIFY(false)("cannot parse metadata", reason);
Expand All @@ -1411,9 +1415,10 @@ class TAccessorsParsingTask: public NConveyor::ITask {
}

TAccessorsParsingTask(
const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& callback, std::vector<TPortionConstructorV2>&& portions)
const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& callback, std::vector<TPortionConstructorV2>&& portions, const NOlap::TTabletId tabletId)
: FetchCallback(callback)
, Portions(std::move(portions))
, TabletId(tabletId)
{

}
Expand Down Expand Up @@ -1499,7 +1504,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
}

AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)));
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors), (NOlap::TTabletId)txc.Tablet));
return true;
}
void Complete(const TActorContext& /*ctx*/) override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ TDataCategorized IGranuleDataAccessor::AnalyzeData(
return DoAnalyzeData(portions, consumer);
}

void TActorAccessorsCallback::OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) {
NActors::TActivationContext::Send(ActorId, std::make_unique<TEvAddPortion>(std::move(accessors)));
void TActorAccessorsCallback::OnAccessorsFetched(TTabletId tabletId, std::vector<TPortionDataAccessor>&& accessors) {
NActors::TActivationContext::Send(ActorId, std::make_unique<TEvAddPortion>(tabletId, std::move(accessors)));
}

} // namespace NKikimr::NOlap::NDataAccessorControl
13 changes: 9 additions & 4 deletions ydb/core/tx/columnshard/data_accessor/abstract/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace NKikimr::NOlap::NDataAccessorControl {
class IAccessorCallback {
public:
virtual void OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) = 0;
virtual void OnAccessorsFetched(TTabletId TabletId, std::vector<TPortionDataAccessor>&& accessors) = 0;
virtual ~IAccessorCallback() = default;
};

Expand All @@ -14,7 +14,7 @@ class TActorAccessorsCallback: public IAccessorCallback {
const NActors::TActorId ActorId;

public:
virtual void OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) override;
virtual void OnAccessorsFetched(TTabletId tabletId, std::vector<TPortionDataAccessor>&& accessors) override;
TActorAccessorsCallback(const NActors::TActorId& actorId)
: ActorId(actorId) {
}
Expand All @@ -37,6 +37,7 @@ class TDataCategorized {
class IGranuleDataAccessor {
private:
const TInternalPathId PathId;
const TTabletId TabletId;

virtual void DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) = 0;
Expand All @@ -49,9 +50,13 @@ class IGranuleDataAccessor {
TInternalPathId GetPathId() const {
return PathId;
}
TTabletId GetTabletId() const {
return TabletId;
}

IGranuleDataAccessor(const TInternalPathId pathId)
: PathId(pathId) {
IGranuleDataAccessor(const TTabletId tabletId, const TInternalPathId pathId)
: PathId(pathId)
, TabletId(tabletId) {
}

void AskData(
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/data_accessor/abstract/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TGranuleMeta;
namespace NKikimr::NOlap::NDataAccessorControl {
class IMetadataMemoryManager {
private:
virtual std::unique_ptr<IGranuleDataAccessor> DoBuildCollector(const TInternalPathId pathId) = 0;
virtual std::unique_ptr<IGranuleDataAccessor> DoBuildCollector(const TTabletId tabletId, const TInternalPathId pathId) = 0;
virtual std::shared_ptr<ITxReader> DoBuildLoader(
const TVersionedIndex& versionedIndex, TGranuleMeta* granule, const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) = 0;

Expand All @@ -22,8 +22,8 @@ class IMetadataMemoryManager {
return false;
}

std::unique_ptr<IGranuleDataAccessor> BuildCollector(const TInternalPathId pathId) {
return DoBuildCollector(pathId);
std::unique_ptr<IGranuleDataAccessor> BuildCollector(const TTabletId tabletId, const TInternalPathId pathId) {
return DoBuildCollector(tabletId, pathId);
}

std::shared_ptr<ITxReader> BuildLoader(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/data_accessor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace NKikimr::NOlap::NDataAccessorControl {

void TActor::Handle(TEvAskServiceDataAccessors::TPtr& ev) {
Manager->AskData(ev->Get()->GetRequest());
Manager->AskData(ev->Get()->GetTabletId(), ev->Get()->GetRequest());
}

void TActor::Bootstrap() {
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/data_accessor/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ class TActor: public TActorBootstrapped<TActor> {
}

void Handle(TEvRegisterController::TPtr& ev) {
Manager->RegisterController(ev->Get()->ExtractController(), ev->Get()->IsUpdate());
Manager->RegisterController(ev->Get()->ExtractController(), ev->Get()->GetTabletId(), ev->Get()->IsUpdate());
}
void Handle(TEvUnregisterController::TPtr& ev) {
Manager->UnregisterController(ev->Get()->GetPathId());
Manager->UnregisterController(ev->Get()->GetTabletId(), ev->Get()->GetPathId());
}
void Handle(TEvAddPortion::TPtr& ev) {
for (auto&& a : ev->Get()->ExtractAccessors()) {
Manager->AddPortion(std::move(a));
Manager->AddPortion(ev->Get()->GetTabletId(), std::move(a));
}
}
void Handle(TEvRemovePortion::TPtr& ev) {
Manager->RemovePortion(ev->Get()->GetPortion());
Manager->RemovePortion(ev->Get()->GetTabletId(), ev->Get()->GetPortion());
}
void Handle(TEvAskServiceDataAccessors::TPtr& ev);

Expand Down
Loading
Loading