Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,6 @@ void TRowDispatcher::UpdateMetrics() {
for (const auto& key : toDelete) {
AggrStats.LastQueryStats.erase(key);
}
PrintStateToLog();
}

TString TRowDispatcher::GetInternalState() {
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScript
{}

NOperationId::TOperationId OperationId;
bool CheckLeaseState = true;
};

struct TEvGetScriptExecutionOperationQueryResponse : public TEventLocal<TEvGetScriptExecutionOperationQueryResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationQueryResponse> {
Expand All @@ -81,22 +82,27 @@ struct TEvGetScriptExecutionOperationQueryResponse : public TEventLocal<TEvGetSc

bool Ready = false;
bool LeaseExpired = false;
TInstant LeaseDeadline;
std::optional<EFinalizationStatus> FinalizationStatus;
TActorId RunScriptActorId;
TString ExecutionId;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
Ydb::Query::ExecuteScriptMetadata Metadata;
bool RetryRequired = false;
bool WaitRetry = false;
i64 LeaseGeneration = 0;
bool StateSaved = false;
NKikimrKqp::TScriptExecutionRetryState RetryState;
};

struct TEvGetScriptExecutionOperationResponse : public TEventLocal<TEvGetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationResponse> {
struct TInfo {
TMaybe<google::protobuf::Any> Metadata;
bool Ready = false;
bool StateSaved = false;
ui64 RetryCount = 0;
TInstant LastFailAt;
TInstant SuspendedUntil;
};

TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, TInfo&& info, NYql::TIssues issues)
Expand All @@ -105,6 +111,9 @@ struct TEvGetScriptExecutionOperationResponse : public TEventLocal<TEvGetScriptE
, Issues(std::move(issues))
, Metadata(std::move(info.Metadata))
, StateSaved(info.StateSaved)
, RetryCount(info.RetryCount)
, LastFailAt(info.LastFailAt)
, SuspendedUntil(info.SuspendedUntil)
{}

TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
Expand All @@ -117,6 +126,9 @@ struct TEvGetScriptExecutionOperationResponse : public TEventLocal<TEvGetScriptE
NYql::TIssues Issues;
TMaybe<google::protobuf::Any> Metadata;
bool StateSaved = false;
ui64 RetryCount = 0;
TInstant LastFailAt;
TInstant SuspendedUntil;
};

struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
Expand Down
56 changes: 54 additions & 2 deletions ydb/core/kqp/common/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
#include "kqp_script_executions.h"

#include <util/string/builder.h>

#include <ydb/public/api/protos/ydb_issue_message.pb.h>
#include <ydb/public/sdk/cpp/src/library/operation_id/protos/operation_id.pb.h>

#include <yql/essentials/public/issue/yql_issue_message.h>

#include <library/cpp/protobuf/json/proto2json.h>

#include <util/string/builder.h>

namespace NKikimr::NKqp {

TString ScriptExecutionOperationFromExecutionId(const std::string& executionId) {
Expand All @@ -13,6 +18,10 @@ TString ScriptExecutionOperationFromExecutionId(const std::string& executionId)
return NOperationId::ProtoToString(operationId);
}

NOperationId::TOperationId OperationIdFromExecutionId(const TString& executionId) {
return NOperationId::TOperationId(ScriptExecutionOperationFromExecutionId(executionId));
}

TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId, TString& error) try {
NOperationId::TOperationId operation(operationId);
return ScriptExecutionIdFromOperation(operation, error);
Expand All @@ -38,4 +47,47 @@ TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId&
return Nothing();
}

NYql::TIssues AddRootIssue(const TString& message, const NYql::TIssues& issues, bool addEmptyRoot) {
if (!issues && !addEmptyRoot) {
return {};
}

NYql::TIssue rootIssue(message);
for (const auto& issue : issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}

return {rootIssue};
}

TString SerializeIssues(const NYql::TIssues& issues) {
NYql::TIssue root;
for (const auto& issue : issues) {
root.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}

Ydb::Issue::IssueMessage rootMessage;
if (issues) {
NYql::IssueToMessage(root, &rootMessage);
}

return NProtobufJson::Proto2Json(rootMessage, NProtobufJson::TProto2JsonConfig());
}

TString SequenceToJsonString(ui64 size, std::function<void(ui64 i, NJson::TJsonValue& value)> valueFiller) {
NJson::TJsonValue value;
value.SetType(NJson::EJsonValueType::JSON_ARRAY);

NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe();
jsonArray.resize(size);
for (ui64 i = 0; i < size; ++i) {
valueFiller(i, jsonArray[i]);
}

NJsonWriter::TBuf serializedJson;
serializedJson.WriteJsonValue(&value, false, PREC_NDIGITS, 17);

return serializedJson.Str();
}

} // namespace NKikimr::NKqp
18 changes: 18 additions & 0 deletions ydb/core/kqp/common/kqp_script_executions.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
#pragma once

#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/library/operation_id/operation_id.h>

#include <yql/essentials/public/issue/yql_issue.h>

#include <library/cpp/json/writer/json_value.h>

#include <util/generic/string.h>
#include <util/generic/maybe.h>


namespace NKikimr::NKqp {

TString ScriptExecutionOperationFromExecutionId(const std::string& executionId);
NOperationId::TOperationId OperationIdFromExecutionId(const TString& executionId);
TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId, TString& error);
TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId, TString& error);

NYql::TIssues AddRootIssue(const TString& message, const NYql::TIssues& issues, bool addEmptyRoot = true);
TString SerializeIssues(const NYql::TIssues& issues);

TString SequenceToJsonString(ui64 size, std::function<void(ui64 i, NJson::TJsonValue& value)> valueFiller);

template <typename TContainer>
TString SequenceToJsonString(const TContainer& container) {
return SequenceToJsonString(container.size(), [&](ui64 i, NJson::TJsonValue& value) {
value = NJson::TJsonValue(container[i]);
});
}

} // namespace NKikimr::NKqp
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
case NKqpProto::TKqpPhyConnection::kResult:
case NKqpProto::TKqpPhyConnection::kValue:
case NKqpProto::TKqpPhyConnection::kMerge:
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET:
break;
}
Expand Down
27 changes: 15 additions & 12 deletions ydb/core/kqp/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,34 @@ SRCS(
)

PEERDIR(
library/cpp/json/writer
library/cpp/lwtrace
library/cpp/protobuf/json
ydb/core/base
ydb/core/engine
ydb/core/protos
ydb/core/scheme
ydb/core/kqp/expr_nodes
ydb/core/kqp/common/simple
ydb/core/grpc_services/cancelation
ydb/core/kqp/common/compilation
ydb/core/kqp/common/events
ydb/core/kqp/common/shutdown
ydb/core/kqp/common/simple
ydb/core/kqp/expr_nodes
ydb/core/kqp/provider
ydb/core/protos
ydb/core/scheme
ydb/core/tx/long_tx_service/public
ydb/core/tx/sharding
ydb/library/yql/dq/expr_nodes
ydb/library/aclib
yql/essentials/core/issue
yql/essentials/core/services
ydb/library/yql/dq/actors
ydb/library/yql/dq/common
yql/essentials/core/dq_integration
yql/essentials/parser/pg_wrapper/interface
ydb/library/yql/dq/expr_nodes
ydb/public/api/protos
ydb/public/sdk/cpp/src/library/operation_id
ydb/public/sdk/cpp/src/library/operation_id/protos
ydb/core/grpc_services/cancelation
library/cpp/lwtrace
#library/cpp/lwtrace/protos
yql/essentials/core/dq_integration
yql/essentials/core/issue
yql/essentials/core/services
yql/essentials/parser/pg_wrapper/interface
yql/essentials/public/issue
)

YQL_LAST_ABI_VERSION()
Expand Down
14 changes: 9 additions & 5 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/kqp/runtime/kqp_compute.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
#include <ydb/core/kqp/runtime/kqp_read_table.h>
#include <ydb/core/kqp/runtime/kqp_sequencer_factory.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
#include <ydb/core/kqp/runtime/kqp_vector_actor.h>
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
#include <ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup_factory.h>
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_write_actor.h>

namespace NKikimr {
namespace NMiniKQL {
Expand Down Expand Up @@ -90,6 +92,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
RegisterKqpWriteActor(*factory, counters);
RegisterSequencerActorFactory(*factory, counters);
RegisterKqpVectorResolveActor(*factory, counters);
NYql::NDq::RegisterDqInputTransformLookupActorFactory(*factory);

if (federatedQuerySetup) {
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
Expand All @@ -101,6 +104,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
}

NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory);
NYql::NDq::RegisterDQSolomonWriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory);
NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ PEERDIR(
ydb/library/formats/arrow/protos
ydb/library/formats/arrow/common
ydb/library/yql/dq/actors/compute
ydb/library/yql/dq/actors/input_transforms
ydb/library/yql/dq/comp_nodes
ydb/library/yql/providers/generic/actors
ydb/library/yql/providers/pq/async_io
ydb/library/yql/providers/s3/actors_factory
ydb/library/yql/providers/solomon/actors
yql/essentials/public/issue
ydb/library/yql/dq/comp_nodes
)

GENERATE_ENUM_SERIALIZATION(kqp_compute_state.h)
Expand Down
52 changes: 52 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,47 @@ void TKqpTasksGraph::BuildVectorResolveChannels(const TStageInfo& stageInfo, ui3
inputStageInfo, outputIndex, enableSpilling, logFunc);
}

void TKqpTasksGraph::BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const TChannelLogFunc& logFunc) {
YQL_ENSURE(stageInfo.Tasks.size() == 1);

auto* settings = GetMeta().Allocate<NDqProto::TDqInputTransformLookupSettings>();
settings->SetLeftLabel(dqSourceStreamLookup.GetLeftLabel());
settings->SetRightLabel(dqSourceStreamLookup.GetRightLabel());
settings->SetJoinType(dqSourceStreamLookup.GetJoinType());
settings->SetNarrowInputRowType(dqSourceStreamLookup.GetConnectionInputRowType());
settings->SetNarrowOutputRowType(dqSourceStreamLookup.GetConnectionOutputRowType());
settings->SetCacheLimit(dqSourceStreamLookup.GetCacheLimit());
settings->SetCacheTtlSeconds(dqSourceStreamLookup.GetCacheTtlSeconds());
settings->SetMaxDelayedRows(dqSourceStreamLookup.GetMaxDelayedRows());
settings->SetIsMultiget(dqSourceStreamLookup.GetIsMultiGet());

const auto& leftJointKeys = dqSourceStreamLookup.GetLeftJoinKeyNames();
settings->MutableLeftJoinKeyNames()->Assign(leftJointKeys.begin(), leftJointKeys.end());

const auto& rightJointKeys = dqSourceStreamLookup.GetRightJoinKeyNames();
settings->MutableRightJoinKeyNames()->Assign(rightJointKeys.begin(), rightJointKeys.end());

auto& streamLookupSource = *settings->MutableRightSource();
streamLookupSource.SetSerializedRowType(dqSourceStreamLookup.GetLookupRowType());
const auto& compiledSource = dqSourceStreamLookup.GetLookupSource();
streamLookupSource.SetProviderName(compiledSource.GetType());
*streamLookupSource.MutableLookupSource() = compiledSource.GetSettings();

TTransform dqSourceStreamLookupTransform = {
.Type = "StreamLookupInputTransform",
.InputType = dqSourceStreamLookup.GetInputStageRowType(),
.OutputType = dqSourceStreamLookup.GetOutputStageRowType(),
};
YQL_ENSURE(dqSourceStreamLookupTransform.Settings.PackFrom(*settings));

for (const auto taskId : stageInfo.Tasks) {
GetTask(taskId).Inputs[inputIndex].Transform = dqSourceStreamLookupTransform;
}

BuildUnionAllChannels(*this, stageInfo, inputIndex, inputStageInfo, outputIndex, /* enableSpilling */ false, logFunc);
}

void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, bool enableSpilling, bool enableShuffleElimination) {
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

Expand Down Expand Up @@ -709,6 +750,12 @@ void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, boo
break;
}

case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup: {
BuildDqSourceStreamLookupChannels(stageInfo, inputIdx, inputStageInfo, outputIdx,
input.GetDqSourceStreamLookup(), log);
break;
}

default:
YQL_ENSURE(false, "Unexpected stage input type: " << (ui32)input.GetTypeCase());
}
Expand Down Expand Up @@ -968,6 +1015,8 @@ void TKqpTasksGraph::FillChannelDesc(NDqProto::TChannel& channelDesc, const TCha
channelDesc.SetSrcTaskId(channel.SrcTask);
channelDesc.SetDstTaskId(channel.DstTask);
channelDesc.SetEnableSpilling(enableSpilling);
channelDesc.SetCheckpointingMode(channel.CheckpointingMode);
channelDesc.SetWatermarksMode(channel.WatermarksMode);

const auto& resultChannelProxies = GetMeta().ResultChannelProxies;

Expand Down Expand Up @@ -1369,6 +1418,8 @@ void TKqpTasksGraph::FillInputDesc(NYql::NDqProto::TTaskInput& inputDesc, const
}

transformProto->MutableSettings()->PackFrom(*input.Meta.VectorResolveSettings);
} else {
*transformProto->MutableSettings() = input.Transform->Settings;
}
}
}
Expand Down Expand Up @@ -1724,6 +1775,7 @@ bool TKqpTasksGraph::BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCo
case NKqpProto::TKqpPhyConnection::kMap:
case NKqpProto::TKqpPhyConnection::kParallelUnionAll:
case NKqpProto::TKqpPhyConnection::kVectorResolve:
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
break;
default:
YQL_ENSURE(false, "Unexpected connection type: " << (ui32)input.GetTypeCase() << Endl);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ class TKqpTasksGraph : public NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMet
void BuildVectorResolveChannels(const TStageInfo& stageInfo, ui32 inputIndex,
const TStageInfo& inputStageInfo, ui32 outputIndex,
const NKqpProto::TKqpPhyCnVectorResolve& vectorResolve, bool enableSpilling, const NYql::NDq::TChannelLogFunc& logFunc);
void BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const NYql::NDq::TChannelLogFunc& logFunc);

void FillOutputDesc(NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, ui32 outputIdx,
bool enableSpilling, const TStageInfo& stageInfo) const;
Expand Down
Loading
Loading