Skip to content

Commit 74fd3bc

Browse files
committed
Implement ReadStateBytes + WriteStateBytes
1 parent 1a097f4 commit 74fd3bc

File tree

16 files changed

+1473
-509
lines changed

16 files changed

+1473
-509
lines changed

docs/plugin-protocol/tfplugin6.proto

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,11 @@ service Provider {
425425
// ConfigureStateStore configures the state store, such as S3 connection in the context of already configured provider
426426
rpc ConfigureStateStore(ConfigureStateStore.Request) returns (ConfigureStateStore.Response);
427427

428+
// ReadStateBytes streams byte chunks of a given state file from a state store
429+
rpc ReadStateBytes(ReadStateBytes.Request) returns (stream ReadStateBytes.ResponseChunk);
430+
// WriteStateBytes streams byte chunks of a given state file into a state store
431+
rpc WriteStateBytes(stream WriteStateBytes.RequestChunk) returns (WriteStateBytes.Response);
432+
428433
// GetStates returns a list of all states (i.e. CE workspaces) managed by a given state store
429434
rpc GetStates(GetStates.Request) returns (GetStates.Response);
430435
// DeleteState instructs a given state store to delete a specific state (i.e. a CE workspace)
@@ -939,6 +944,37 @@ message ConfigureStateStore {
939944
}
940945
}
941946

947+
message ReadStateBytes {
948+
message Request {
949+
string type_name = 1;
950+
string state_id = 2;
951+
}
952+
message ResponseChunk {
953+
bytes bytes = 1;
954+
int64 total_length = 2;
955+
StateRange range = 3;
956+
repeated Diagnostic diagnostics = 4;
957+
}
958+
}
959+
960+
message WriteStateBytes {
961+
message RequestChunk {
962+
string type_name = 1;
963+
bytes bytes = 2;
964+
string state_id = 3;
965+
int64 total_length = 4;
966+
StateRange range = 5;
967+
}
968+
message Response {
969+
repeated Diagnostic diagnostics = 1;
970+
}
971+
}
972+
973+
message StateRange {
974+
int64 start = 1;
975+
int64 end = 2;
976+
}
977+
942978
message GetStates {
943979
message Request {
944980
string type_name = 1;

internal/builtin/providers/terraform/provider.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,18 @@ func (p *Provider) ConfigureStateStore(req providers.ConfigureStateStoreRequest)
291291
return resp
292292
}
293293

294+
func (p *Provider) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
295+
var resp providers.ReadStateBytesResponse
296+
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))
297+
return resp
298+
}
299+
300+
func (p *Provider) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
301+
var resp providers.WriteStateBytesResponse
302+
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))
303+
return resp
304+
}
305+
294306
func (p *Provider) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
295307
var resp providers.GetStatesResponse
296308
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))

internal/grpcwrap/provider6.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,14 @@ func (p *provider6) ConfigureStateStore(ctx context.Context, req *tfplugin6.Conf
918918
panic("not implemented")
919919
}
920920

921+
func (p *provider6) ReadStateBytes(req *tfplugin6.ReadStateBytes_Request, srv tfplugin6.Provider_ReadStateBytesServer) error {
922+
panic("not implemented")
923+
}
924+
925+
func (p *provider6) WriteStateBytes(srv tfplugin6.Provider_WriteStateBytesServer) error {
926+
panic("not implemented")
927+
}
928+
921929
func (p *provider6) GetStates(ctx context.Context, req *tfplugin6.GetStates_Request) (*tfplugin6.GetStates_Response, error) {
922930
panic("not implemented")
923931
}

internal/plugin/grpc_provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,6 +1418,14 @@ func (p *GRPCProvider) ConfigureStateStore(r providers.ConfigureStateStoreReques
14181418
panic("not implemented")
14191419
}
14201420

1421+
func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
1422+
panic("not implemented")
1423+
}
1424+
1425+
func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
1426+
panic("not implemented")
1427+
}
1428+
14211429
func (p *GRPCProvider) GetStates(r providers.GetStatesRequest) providers.GetStatesResponse {
14221430
panic("not implemented")
14231431
}

internal/plugin6/grpc_provider.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package plugin6
55

66
import (
7+
"bytes"
78
"context"
89
"errors"
910
"fmt"
@@ -1478,6 +1479,140 @@ func (p *GRPCProvider) ConfigureStateStore(r providers.ConfigureStateStoreReques
14781479
return resp
14791480
}
14801481

1482+
func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp providers.ReadStateBytesResponse) {
1483+
logger.Trace("GRPCProvider.v6: ReadStateBytes")
1484+
1485+
schema := p.GetProviderSchema()
1486+
if schema.Diagnostics.HasErrors() {
1487+
resp.Diagnostics = schema.Diagnostics
1488+
return resp
1489+
}
1490+
1491+
if _, ok := schema.StateStores[r.TypeName]; !ok {
1492+
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown state store type %q", r.TypeName))
1493+
return resp
1494+
}
1495+
1496+
protoReq := &proto6.ReadStateBytes_Request{
1497+
TypeName: r.TypeName,
1498+
StateId: r.StateId,
1499+
}
1500+
1501+
// Start the streaming RPC with a context. The context will be cancelled
1502+
// when this function returns, which will stop the stream if it is still
1503+
// running.
1504+
ctx, cancel := context.WithCancel(p.ctx)
1505+
defer cancel()
1506+
1507+
client, err := p.client.ReadStateBytes(ctx, protoReq)
1508+
if err != nil {
1509+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1510+
return resp
1511+
}
1512+
1513+
var buf *bytes.Buffer
1514+
var expectedTotalLength int
1515+
for {
1516+
chunk, err := client.Recv()
1517+
if err == io.EOF {
1518+
// End of stream, we're done
1519+
break
1520+
}
1521+
if err != nil {
1522+
resp.Diagnostics = resp.Diagnostics.Append(err)
1523+
break
1524+
}
1525+
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(chunk.Diagnostics))
1526+
if resp.Diagnostics.HasErrors() {
1527+
// If we have errors, we stop processing and return early
1528+
break
1529+
}
1530+
1531+
if expectedTotalLength == 0 {
1532+
expectedTotalLength = int(chunk.TotalLength)
1533+
}
1534+
logger.Trace("GRPCProvider.v6: ReadStateBytes: received chunk for range", chunk.Range)
1535+
1536+
n, err := buf.Write(chunk.Bytes)
1537+
if err != nil {
1538+
resp.Diagnostics = resp.Diagnostics.Append(err)
1539+
break
1540+
}
1541+
logger.Trace("GRPCProvider.v6: ReadStateBytes: read bytes of a chunk", n)
1542+
}
1543+
1544+
logger.Trace("GRPCProvider.v6: ReadStateBytes: received all chunks", buf.Len())
1545+
if buf.Len() != expectedTotalLength {
1546+
err = fmt.Errorf("expected state file of total %d bytes, received %d bytes",
1547+
expectedTotalLength, buf.Len())
1548+
resp.Diagnostics = resp.Diagnostics.Append(err)
1549+
return resp
1550+
}
1551+
resp.Bytes = buf.Bytes()
1552+
1553+
return resp
1554+
}
1555+
1556+
func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) (resp providers.WriteStateBytesResponse) {
1557+
logger.Trace("GRPCProvider.v6: WriteStateBytes")
1558+
1559+
schema := p.GetProviderSchema()
1560+
if schema.Diagnostics.HasErrors() {
1561+
resp.Diagnostics = schema.Diagnostics
1562+
return resp
1563+
}
1564+
1565+
if _, ok := schema.StateStores[r.TypeName]; !ok {
1566+
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown state store type %q", r.TypeName))
1567+
return resp
1568+
}
1569+
1570+
// Start the streaming RPC with a context. The context will be cancelled
1571+
// when this function returns, which will stop the stream if it is still
1572+
// running.
1573+
ctx, cancel := context.WithCancel(p.ctx)
1574+
defer cancel()
1575+
1576+
// TODO: Configurable chunk size
1577+
chunkSize := 4 * 1_000_000 // 4MB
1578+
1579+
if len(r.Bytes) < chunkSize {
1580+
protoReq := &proto6.WriteStateBytes_RequestChunk{
1581+
TypeName: r.TypeName,
1582+
StateId: r.StateId,
1583+
Bytes: r.Bytes,
1584+
TotalLength: int64(len(r.Bytes)),
1585+
Range: &proto6.StateRange{
1586+
Start: 0,
1587+
End: int64(len(r.Bytes)),
1588+
},
1589+
}
1590+
client, err := p.client.WriteStateBytes(ctx)
1591+
if err != nil {
1592+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1593+
return resp
1594+
}
1595+
err = client.Send(protoReq)
1596+
if err != nil {
1597+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1598+
return resp
1599+
}
1600+
protoResp, err := client.CloseAndRecv()
1601+
if err != nil {
1602+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1603+
return resp
1604+
}
1605+
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
1606+
if resp.Diagnostics.HasErrors() {
1607+
return resp
1608+
}
1609+
}
1610+
1611+
// TODO: implement chunking for state files larger than chunkSize
1612+
1613+
return resp
1614+
}
1615+
14811616
func (p *GRPCProvider) GetStates(r providers.GetStatesRequest) (resp providers.GetStatesResponse) {
14821617
logger.Trace("GRPCProvider.v6: GetStates")
14831618

internal/plugin6/mock_proto/mock.go

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/provider-simple-v6/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,14 @@ func (s simple) ConfigureStateStore(req providers.ConfigureStateStoreRequest) pr
311311
panic("not implemented")
312312
}
313313

314+
func (s simple) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
315+
panic("not implemented")
316+
}
317+
318+
func (s simple) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
319+
panic("not implemented")
320+
}
321+
314322
func (s simple) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
315323
panic("not implemented")
316324
}

internal/provider-simple/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,14 @@ func (s simple) ConfigureStateStore(req providers.ConfigureStateStoreRequest) pr
271271
panic("not implemented")
272272
}
273273

274+
func (s simple) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
275+
panic("not implemented")
276+
}
277+
278+
func (s simple) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
279+
panic("not implemented")
280+
}
281+
274282
func (s simple) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
275283
// provider-simple uses protocol version 5, which does not include the RPC that maps to this method
276284
panic("not implemented")

internal/providers/mock.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,14 @@ func (m *Mock) ConfigureStateStore(req ConfigureStateStoreRequest) ConfigureStat
432432
return m.Provider.ConfigureStateStore(req)
433433
}
434434

435+
func (m *Mock) ReadStateBytes(req ReadStateBytesRequest) ReadStateBytesResponse {
436+
return m.Provider.ReadStateBytes(req)
437+
}
438+
439+
func (m *Mock) WriteStateBytes(req WriteStateBytesRequest) WriteStateBytesResponse {
440+
return m.Provider.WriteStateBytes(req)
441+
}
442+
435443
func (m *Mock) GetStates(req GetStatesRequest) GetStatesResponse {
436444
return m.Provider.GetStates(req)
437445
}

internal/providers/provider.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ type Interface interface {
118118
// ConfigureStateStore configures the state store, such as S3 connection in the context of already configured provider
119119
ConfigureStateStore(ConfigureStateStoreRequest) ConfigureStateStoreResponse
120120

121+
// ReadStateBytes streams byte chunks of a given state file from a state store
122+
ReadStateBytes(ReadStateBytesRequest) ReadStateBytesResponse
123+
// WriteStateBytes streams byte chunks of a given state file into a state store
124+
WriteStateBytes(WriteStateBytesRequest) WriteStateBytesResponse
125+
121126
// GetStates returns a list of all states (i.e. CE workspaces) managed by a given state store
122127
GetStates(GetStatesRequest) GetStatesResponse
123128
// DeleteState instructs a given state store to delete a specific state (i.e. a CE workspace)
@@ -850,6 +855,34 @@ type ConfigureStateStoreResponse struct {
850855
Diagnostics tfdiags.Diagnostics
851856
}
852857

858+
type ReadStateBytesRequest struct {
859+
// TypeName is the name of the state store to read state from
860+
TypeName string
861+
// StateId is the ID of a state file to read
862+
StateId string
863+
}
864+
865+
type ReadStateBytesResponse struct {
866+
// Bytes represents all received bytes of the given state file
867+
Bytes []byte
868+
// Diagnostics contains any warnings or errors from the method call.
869+
Diagnostics tfdiags.Diagnostics
870+
}
871+
872+
type WriteStateBytesRequest struct {
873+
// TypeName is the name of the state store to write state to
874+
TypeName string
875+
// Bytes represents all bytes of the given state file to write
876+
Bytes []byte
877+
// StateId is the ID of a state file to write
878+
StateId string
879+
}
880+
881+
type WriteStateBytesResponse struct {
882+
// Diagnostics contains any warnings or errors from the method call.
883+
Diagnostics tfdiags.Diagnostics
884+
}
885+
853886
type GetStatesRequest struct {
854887
// TypeName is the name of the state store to request the list of states from
855888
TypeName string

0 commit comments

Comments
 (0)