From 6678687a690c596a1de6ee34726d2e048afabff6 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 30 Mar 2016 17:49:59 +0900 Subject: [PATCH 1/5] Implement ClientConfigurationConfigurable --- .../input/s3/AbstractS3FileInputPlugin.java | 19 +- .../s3/ClientConfigurationConfigurable.java | 421 ++++++++++++++++++ .../TestClientConfigurationConfigurable.java | 145 ++++++ 3 files changed, 571 insertions(+), 14 deletions(-) create mode 100644 embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java create mode 100644 embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java index 3f50eae..ddc9cb0 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java @@ -1,19 +1,15 @@ package org.embulk.input.s3; import java.util.List; -import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.io.IOException; import java.io.InterruptedIOException; import java.io.InputStream; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.base.Optional; import com.google.common.base.Throwables; import org.slf4j.Logger; -import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.ListObjectsRequest; @@ -23,7 +19,6 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.ClientConfiguration; import com.amazonaws.AmazonServiceException; -import com.amazonaws.Protocol; import org.embulk.config.Config; import org.embulk.config.ConfigInject; import org.embulk.config.ConfigDefault; @@ -65,7 +60,9 @@ public interface PluginTask @ConfigDefault("null") public Optional getAccessKeyId(); - // TODO timeout, ssl, etc + @Config("client_config") + @ConfigDefault("{}") + public ClientConfigurationConfigurable.Task getClientConfigurationConfigurableTask(); public FileList getFiles(); public void setFiles(FileList files); @@ -129,14 +126,8 @@ protected AWSCredentialsProvider getCredentialsProvider(PluginTask task) protected ClientConfiguration getClientConfiguration(PluginTask task) { - ClientConfiguration clientConfig = new ClientConfiguration(); - - //clientConfig.setProtocol(Protocol.HTTP); - clientConfig.setMaxConnections(50); // SDK default: 50 - clientConfig.setMaxErrorRetry(3); // SDK default: 3 - clientConfig.setSocketTimeout(8*60*1000); // SDK default: 50*1000 - - return clientConfig; + ClientConfigurationConfigurable.Task configurableTask = task.getClientConfigurationConfigurableTask(); + return ClientConfigurationConfigurable.getClientConfiguration(configurableTask); } private FileList listFiles(PluginTask task) diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java new file mode 100644 index 0000000..6e41c04 --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java @@ -0,0 +1,421 @@ +package org.embulk.input.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.google.common.base.Optional; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.ConfigException; +import org.embulk.config.Task; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.SecureRandom; + +/** + * Created by takahiro.nakayama on 3/25/16. + * This class is utility class for ClientConfiguration + */ +public abstract class ClientConfigurationConfigurable +{ + public interface Task + extends org.embulk.config.Task + { + @Config("protocol") + @ConfigDefault("null") + Optional getProtocol(); + + @Config("max_connections") + @ConfigDefault("null") // default: 50 + Optional getMaxConnections(); + + @Config("user_agent") + @ConfigDefault("null") + Optional getUserAgent(); + + @Config("local_address") + @ConfigDefault("null") + Optional getLocalAddress(); + + @Config("proxy_host") + @ConfigDefault("null") + Optional getProxyHost(); + + @Config("proxy_port") + @ConfigDefault("null") + Optional getProxyPort(); + + @Config("proxy_username") + @ConfigDefault("null") + Optional getProxyUsername(); + + @Config("proxy_password") + @ConfigDefault("null") + Optional getProxyPassword(); + + @Config("proxy_domain") + @ConfigDefault("null") + Optional getProxyDomain(); + + @Config("proxy_workstation") + @ConfigDefault("null") + Optional getProxyWorkstation(); + + // NOTE: RetryPolicy is a interface + // @Config("retry_policy") + // @ConfigDefault("null") + // Optional getRetryPolicy(); + + @Config("max_error_retry") + @ConfigDefault("null") // default: 3 + Optional getMaxErrorRetry(); + + @Config("socket_timeout") + @ConfigDefault("null") // default: 8*60*1000 + Optional getSocketTimeout(); + + @Config("connection_timeout") + @ConfigDefault("null") + Optional getConnectionTimeout(); + + @Config("request_timeout") + @ConfigDefault("null") + Optional getRequestTimeout(); + + // NOTE: Can use `client_execution_timeout` from v1.10.65 + // @Config("client_execution_timeout") + // @ConfigDefault("null") + // Optional getClientExecutionTimeout(); + + @Config("use_reaper") + @ConfigDefault("null") + Optional getUseReaper(); + + // NOTE: Can use `use_throttle_retries` from v1.10.65 + // @Config("use_throttle_retries") + // @ConfigDefault("null") + // Optional getUseThrottleRetries(); + + @Config("use_gzip") + @ConfigDefault("null") + Optional getUseGzip(); + + @Config("socket_send_buffer_size_hints") // used by SocketBufferSizeHints + @ConfigDefault("null") + Optional getSocketSendBufferSizeHint(); + + @Config("socket_receive_buffer_size_hints") // used by SocketBufferSizeHints + @ConfigDefault("null") + Optional getSocketReceiveBufferSizeHints(); + + @Config("signer_override") + @ConfigDefault("null") + Optional getSignerOverride(); + + @Config("preemptive_basic_proxy_auth") + @ConfigDefault("null") + Optional getPreemptiveBasicProxyAuth(); + + @Config("connection_ttl") + @ConfigDefault("null") + Optional getConnectionTTL(); + + @Config("connection_max_idle_millis") + @ConfigDefault("null") + Optional getConnectionMaxIdleMillis(); + + @Config("use_tcp_keep_alive") + @ConfigDefault("null") + Optional getUseTcpKeepAlive(); + + // NOTE: DnsResolver is a interface + // @Config("dns_resolver") + // @ConfigDefault("null") + // Optional getDnsResolver(); + + @Config("response_metadata_cache_size") + @ConfigDefault("null") + Optional getResponseMetadataCacheSize(); + + @Config("secure_random") + @ConfigDefault("null") + Optional getSecureRandom(); + + @Config("use_expect_continue") + @ConfigDefault("null") + Optional getUseExpectContinue(); + } + + public interface SecureRandomTask + extends org.embulk.config.Task + { + @Config("algorithm") + String getAlgorithm(); + + @Config("provider") + @ConfigDefault("null") + Optional getProvider(); + } + + protected ClientConfigurationConfigurable() + { + } + + // For backward compatibility + public static final int DEFAULT_MAX_CONNECTIONS = 50; // SDK default: 50 + public static final int DEFAULT_MAX_ERROR_RETRY = 3; // SDK default: 3 + public static final int DEFAULT_SOCKET_TIMEOUT = 8*60*1000; // SDK default: 50*1000 + + public static ClientConfiguration getClientConfiguration(Task task) + { + ClientConfiguration clientConfiguration = new ClientConfiguration(); + + setProtocolOrDoNothing(clientConfiguration, task.getProtocol()); + setMaxConnectionsOrDoNothing(clientConfiguration, task.getMaxConnections()); + setUserAgentOrDoNothing(clientConfiguration, task.getUserAgent()); + + setLocalAddressOrDoNothing(clientConfiguration, task.getLocalAddress()); + setProxyHostOrDoNothing(clientConfiguration, task.getProxyHost()); + setProxyPortOrDoNothing(clientConfiguration, task.getProxyPort()); + + setProxyUsernameOrDoNothing(clientConfiguration, task.getProxyUsername()); + setProxyPasswordOrDoNothing(clientConfiguration, task.getProxyPassword()); + setProxyDomainOrDoNothing(clientConfiguration, task.getProxyDomain()); + + setProxyWorkstationOrDoNothing(clientConfiguration, task.getProxyWorkstation()); + setMaxErrorRetryOrDoNothing(clientConfiguration, task.getMaxErrorRetry()); + setSocketTimeoutOrDoNothing(clientConfiguration, task.getSocketTimeout()); + + setConnectionTimeoutOrDoNothing(clientConfiguration, task.getConnectionTimeout()); + setRequestTimeoutOrDoNothing(clientConfiguration, task.getRequestTimeout()); + setUseReaperOrDoNothing(clientConfiguration, task.getUseReaper()); + + setUseGzipOrDoNothing(clientConfiguration, task.getUseGzip()); + setSocketBufferSizeHintsOrDoNothing(clientConfiguration, task.getSocketSendBufferSizeHint(), + task.getSocketReceiveBufferSizeHints()); + + setSignerOverrideOrDoNothing(clientConfiguration, task.getSignerOverride()); + setPreemptiveBasicProxyAuthOrDoNothing(clientConfiguration, task.getPreemptiveBasicProxyAuth()); + setConnectionTTLOrDoNothing(clientConfiguration, task.getConnectionTTL()); + + setConnectionMaxIdleMillisOrDoNothing(clientConfiguration, task.getConnectionMaxIdleMillis()); + setUseTcpKeepAliveOrDoNothing(clientConfiguration, task.getUseTcpKeepAlive()); + setResponseMetadataCacheSizeOrDoNothing(clientConfiguration, task.getResponseMetadataCacheSize()); + + setSecureRandomOrDoNothing(clientConfiguration, task.getSecureRandom()); + setUseExpectContinueOrDoNothing(clientConfiguration, task.getUseExpectContinue()); + + setRetryPolicy(clientConfiguration); + setDnsResolver(clientConfiguration); + + return clientConfiguration; + } + + protected static void setProtocolOrDoNothing(ClientConfiguration clientConfiguration, Optional protocol) + { + if (protocol.isPresent()) { + clientConfiguration.setProtocol(protocol.get()); + } + } + + protected static void setMaxConnectionsOrDoNothing(ClientConfiguration clientConfiguration, Optional maxConnections) + { + clientConfiguration.setMaxConnections(maxConnections.or(DEFAULT_MAX_CONNECTIONS)); + } + + protected static void setUserAgentOrDoNothing(ClientConfiguration clientConfiguration, Optional userAgent) + { + if (userAgent.isPresent()) { + clientConfiguration.setUserAgent(userAgent.get()); + } + } + + protected static void setLocalAddressOrDoNothing(ClientConfiguration clientConfiguration, Optional localAddress) + { + if (localAddress.isPresent()) { + InetAddress inetAddress = getInetAddress(localAddress.get()); + clientConfiguration.setLocalAddress(inetAddress); + } + } + + protected static void setProxyHostOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyHost) + { + if (proxyHost.isPresent()) { + clientConfiguration.setProxyHost(proxyHost.get()); + } + } + + protected static void setProxyPortOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyPort) + { + if (proxyPort.isPresent()) { + clientConfiguration.setProxyPort(proxyPort.get()); + } + } + + protected static void setProxyUsernameOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyUsername) + { + if (proxyUsername.isPresent()) { + clientConfiguration.setProxyUsername(proxyUsername.get()); + } + } + + protected static void setProxyPasswordOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyPassword) + { + if (proxyPassword.isPresent()) { + clientConfiguration.setProxyPassword(proxyPassword.get()); + } + } + + protected static void setProxyDomainOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyDomain) + { + if (proxyDomain.isPresent()) { + clientConfiguration.setProxyDomain(proxyDomain.get()); + } + } + + protected static void setProxyWorkstationOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyWorkstation) + { + if (proxyWorkstation.isPresent()) { + clientConfiguration.setProxyWorkstation(proxyWorkstation.get()); + } + } + + protected static void setMaxErrorRetryOrDoNothing(ClientConfiguration clientConfiguration, Optional maxErrorRetry) + { + clientConfiguration.setMaxErrorRetry(maxErrorRetry.or(DEFAULT_MAX_ERROR_RETRY)); + } + + protected static void setSocketTimeoutOrDoNothing(ClientConfiguration clientConfiguration, Optional socketTimeout) + { + clientConfiguration.setSocketTimeout(socketTimeout.or(DEFAULT_SOCKET_TIMEOUT)); + } + + protected static void setConnectionTimeoutOrDoNothing(ClientConfiguration clientConfiguration, Optional connectionTimeout) + { + if (connectionTimeout.isPresent()) { + clientConfiguration.setConnectionTimeout(connectionTimeout.get()); + } + } + + protected static void setRequestTimeoutOrDoNothing(ClientConfiguration clientConfiguration, Optional requestTimeout) + { + if (requestTimeout.isPresent()) { + clientConfiguration.setRequestTimeout(requestTimeout.get()); + } + } + + protected static void setUseReaperOrDoNothing(ClientConfiguration clientConfiguration, Optional useReaper) + { + if (useReaper.isPresent()) { + clientConfiguration.setUseReaper(useReaper.get()); + } + } + + protected static void setUseGzipOrDoNothing(ClientConfiguration clientConfiguration, Optional useGzip) + { + if (useGzip.isPresent()) { + clientConfiguration.setUseGzip(useGzip.get()); + } + } + + protected static void setSocketBufferSizeHintsOrDoNothing(ClientConfiguration clientConfiguration, + Optional socketSendBufferSizeHint, Optional socketReceiveBufferSizeHint) + { + if (socketSendBufferSizeHint.isPresent() && socketReceiveBufferSizeHint.isPresent()) { + clientConfiguration.setSocketBufferSizeHints(socketSendBufferSizeHint.get(), socketReceiveBufferSizeHint.get()); + } + } + + protected static void setSignerOverrideOrDoNothing(ClientConfiguration clientConfiguration, Optional value) + { + if (value.isPresent()) { + clientConfiguration.setSignerOverride(value.get()); + } + } + + protected static void setPreemptiveBasicProxyAuthOrDoNothing(ClientConfiguration clientConfiguration, Optional preemptiveBasicProxyAuth) + { + if (preemptiveBasicProxyAuth.isPresent()) { + clientConfiguration.setPreemptiveBasicProxyAuth(preemptiveBasicProxyAuth.get()); + } + } + + protected static void setConnectionTTLOrDoNothing(ClientConfiguration clientConfiguration, Optional connectionTTL) + { + if (connectionTTL.isPresent()) { + clientConfiguration.setConnectionTTL(connectionTTL.get()); + } + } + + protected static void setConnectionMaxIdleMillisOrDoNothing(ClientConfiguration clientConfiguration, Optional connectionMaxIdleMillis) + { + if (connectionMaxIdleMillis.isPresent()) { + clientConfiguration.setConnectionMaxIdleMillis(connectionMaxIdleMillis.get()); + } + } + + protected static void setUseTcpKeepAliveOrDoNothing(ClientConfiguration clientConfiguration, Optional useTcpKeepAlive) + { + if (useTcpKeepAlive.isPresent()) { + clientConfiguration.setUseTcpKeepAlive(useTcpKeepAlive.get()); + } + } + + protected static void setResponseMetadataCacheSizeOrDoNothing(ClientConfiguration clientConfiguration, Optional responseMetadataCacheSize) + { + if (responseMetadataCacheSize.isPresent()) { + clientConfiguration.setResponseMetadataCacheSize(responseMetadataCacheSize.get()); + } + } + + protected static void setSecureRandomOrDoNothing(ClientConfiguration clientConfiguration, Optional secureRandomTask) + { + if (secureRandomTask.isPresent()) { + SecureRandom secureRandom = getSecureRandom(secureRandomTask.get()); + clientConfiguration.setSecureRandom(secureRandom); + } + } + + protected static void setUseExpectContinueOrDoNothing(ClientConfiguration clientConfiguration, Optional useExpectContinue) + { + if (useExpectContinue.isPresent()) { + clientConfiguration.setUseExpectContinue(useExpectContinue.get()); + } + } + + protected static void setRetryPolicy(ClientConfiguration clientConfiguration) + { + } + + protected static void setDnsResolver(ClientConfiguration clientConfiguration) + { + } + + + private static InetAddress getInetAddress(String host) + { + try { + return InetAddress.getByName(host); + } + catch (UnknownHostException e) { + throw new ConfigException(e); + } + } + + private static SecureRandom getSecureRandom(SecureRandomTask secureRandomTask) + { + try { + if (secureRandomTask.getProvider().isPresent()) { + return SecureRandom.getInstance(secureRandomTask.getAlgorithm(), secureRandomTask.getProvider().get()); + } + else { + return SecureRandom.getInstance(secureRandomTask.getAlgorithm()); + } + } + catch (NoSuchAlgorithmException | NoSuchProviderException e) { + throw new ConfigException(e); + } + } +} diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java new file mode 100644 index 0000000..f4fac0f --- /dev/null +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java @@ -0,0 +1,145 @@ +package org.embulk.input.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.spi.Exec; +import org.embulk.spi.FileInputRunner; +import org.embulk.spi.InputPlugin; +import org.embulk.spi.Schema; +import org.embulk.spi.TestPageBuilderReader; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.List; + +import static org.embulk.input.s3.TestS3FileInputPlugin.parserConfig; +import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig; +import static org.junit.Assert.*; +import static org.junit.Assume.assumeNotNull; + +/** + * Created by takahiro.nakayama on 3/30/16. + */ +public class TestClientConfigurationConfigurable +{ + private static String EMBULK_S3_TEST_BUCKET; + private static String EMBULK_S3_TEST_ACCESS_KEY_ID; + private static String EMBULK_S3_TEST_SECRET_ACCESS_KEY; + private static final String EMBULK_S3_TEST_PATH_PREFIX = "embulk_input_s3_test"; + + /* + * This test case requires environment variables: + * EMBULK_S3_TEST_BUCKET + * EMBULK_S3_TEST_ACCESS_KEY_ID + * EMBULK_S3_TEST_SECRET_ACCESS_KEY + * If the variables not set, the test case is skipped. + */ + @BeforeClass + public static void initializeConstantVariables() + { + EMBULK_S3_TEST_BUCKET = System.getenv("EMBULK_S3_TEST_BUCKET"); + EMBULK_S3_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_S3_TEST_ACCESS_KEY_ID"); + EMBULK_S3_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_S3_TEST_SECRET_ACCESS_KEY"); + assumeNotNull(EMBULK_S3_TEST_BUCKET, EMBULK_S3_TEST_ACCESS_KEY_ID, EMBULK_S3_TEST_SECRET_ACCESS_KEY); + } + + @Rule + public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); + + private S3FileInputPlugin plugin; + private ConfigSource config; + private FileInputRunner runner; + private TestPageBuilderReader.MockPageOutput output; + + @Before + public void createResources() + { + plugin = new S3FileInputPlugin(); + config = runtime.getExec().newConfigSource() + .set("type", "s3") + .set("bucket", EMBULK_S3_TEST_BUCKET) + .set("path_prefix", EMBULK_S3_TEST_PATH_PREFIX) + .set("parser", parserConfig(schemaConfig())); + runner = new FileInputRunner(runtime.getInstance(S3FileInputPlugin.class)); + output = new TestPageBuilderReader.MockPageOutput(); + } + + @Test + public void setOneParam() + { + config.setNested("client_config", Exec.newConfigSource() + .set("protocol", "HTTP") + .set("user_agent", "test_embulk_input_s3")); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + + assertEquals(Protocol.HTTP, clientConfiguration.getProtocol()); + assertEquals("test_embulk_input_s3", clientConfiguration.getUserAgent()); + } + + @Test + public void setTwoParam() + { + config.setNested("client_config", Exec.newConfigSource() + .set("socket_send_buffer_size_hints", 4) + .set("socket_receive_buffer_size_hints", 8)); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + + int[] socketBufferSizeHints = clientConfiguration.getSocketBufferSizeHints(); + assertEquals(4, socketBufferSizeHints[0]); + assertEquals(8, socketBufferSizeHints[1]); + } + + @Test + public void defaultValue() + { + ClientConfiguration clientConfiguration = getClientConfiguration(); + + assertEquals(3, clientConfiguration.getMaxErrorRetry()); + assertEquals(50, clientConfiguration.getMaxConnections()); + assertEquals(8*60*1000, clientConfiguration.getSocketTimeout()); + } + + @Test + public void secureRandom() + throws NoSuchAlgorithmException + { + config.setNested("client_config", Exec.newConfigSource() + .setNested("secure_random", Exec.newConfigSource() + .set("algorithm", "SHA1PRNG") + ) + ); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + + assertEquals(SecureRandom.getInstance("SHA1PRNG").getAlgorithm(), clientConfiguration.getSecureRandom().getAlgorithm()); + } + + @Test(expected = ConfigException.class) + public void secureRandomNoSuchAlgorithmException() + { + config.setNested("client_config", Exec.newConfigSource() + .setNested("secure_random", Exec.newConfigSource() + .set("algorithm", "FOOOOOOO") + ) + ); + + ClientConfiguration clientConfiguration = getClientConfiguration(); + } + + private ClientConfiguration getClientConfiguration() + { + S3FileInputPlugin.S3PluginTask task = config.loadConfig(S3FileInputPlugin.S3PluginTask.class); + return plugin.getClientConfiguration(task); + } +} \ No newline at end of file From 1ec66184205a55a189e06ac17e2499ec6ec0fcb2 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 30 Mar 2016 18:03:15 +0900 Subject: [PATCH 2/5] Write README about client_config --- README.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/README.md b/README.md index 356003f..a1e5be1 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,60 @@ - **session_token**: session token (string, required) +- **client_config**: configure S3 client config (optional) + + - **protocol**: (enum, `HTTP` or `HTTPS`, optional) + + - **max_connections**: (int, optional) + + - **user_agent** (string, optional) + + - **local_address**: name of a hostname (string, optional) + + - **proxy_host**: name of a hostname (string, optional) + + - **proxy_port**: (int, optional) + + - **proxy_username**: (string, optional) + + - **proxy_password**: (string, optional) + + - **proxy_domain**: (string, optional) + + - **proxy_workstation**: (string, optional) + + - **max_error_retry**: (int, optional) + + - **socket_timeout**: (int, optional) + + - **connection_timeout**: (int, optional) + + - **request_timeout**: (int, optional) + + - **use_reaper**: (boolean, optional) + + - **use_gzip**: (boolean, optional) + + - **signer_override**: (string, optional) + + - **preemptive_basic_proxy_auth**: (boolean, optional) + + - **connection_ttl**: (long, optional) + + - **connection_max_idle_millis**: (long, optional) + + - **use_tcp_keep_alive**: (boolean, optional) + + - **response_metadata_cache_size**: (int, optional) + + - **use_expect_continue**: (boolean, optional) + + - **secure_random**: (optional) + + - **algorithm**: (string, required) + + - **provider**: (string, optional) + * **path_match_pattern**: regexp to match file paths. If a file path doesn't match with this pattern, the file will be skipped (regexp string, optional) * **total_file_count_limit**: maximum number of files to read (integer, optional) From 76140c0421808d13420232c632817bf08fdb708d Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 30 Mar 2016 18:12:17 +0900 Subject: [PATCH 3/5] Remove unuse code from TestClientConfigurationConfigurable.java --- .../TestClientConfigurationConfigurable.java | 40 ++----------------- 1 file changed, 3 insertions(+), 37 deletions(-) diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java index f4fac0f..7279ec2 100644 --- a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java @@ -5,60 +5,28 @@ import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigException; import org.embulk.config.ConfigSource; -import org.embulk.config.TaskReport; -import org.embulk.config.TaskSource; import org.embulk.spi.Exec; -import org.embulk.spi.FileInputRunner; -import org.embulk.spi.InputPlugin; -import org.embulk.spi.Schema; -import org.embulk.spi.TestPageBuilderReader; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; -import java.util.List; import static org.embulk.input.s3.TestS3FileInputPlugin.parserConfig; import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig; -import static org.junit.Assert.*; -import static org.junit.Assume.assumeNotNull; +import static org.junit.Assert.assertEquals; /** * Created by takahiro.nakayama on 3/30/16. */ public class TestClientConfigurationConfigurable { - private static String EMBULK_S3_TEST_BUCKET; - private static String EMBULK_S3_TEST_ACCESS_KEY_ID; - private static String EMBULK_S3_TEST_SECRET_ACCESS_KEY; - private static final String EMBULK_S3_TEST_PATH_PREFIX = "embulk_input_s3_test"; - - /* - * This test case requires environment variables: - * EMBULK_S3_TEST_BUCKET - * EMBULK_S3_TEST_ACCESS_KEY_ID - * EMBULK_S3_TEST_SECRET_ACCESS_KEY - * If the variables not set, the test case is skipped. - */ - @BeforeClass - public static void initializeConstantVariables() - { - EMBULK_S3_TEST_BUCKET = System.getenv("EMBULK_S3_TEST_BUCKET"); - EMBULK_S3_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_S3_TEST_ACCESS_KEY_ID"); - EMBULK_S3_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_S3_TEST_SECRET_ACCESS_KEY"); - assumeNotNull(EMBULK_S3_TEST_BUCKET, EMBULK_S3_TEST_ACCESS_KEY_ID, EMBULK_S3_TEST_SECRET_ACCESS_KEY); - } - @Rule public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); private S3FileInputPlugin plugin; private ConfigSource config; - private FileInputRunner runner; - private TestPageBuilderReader.MockPageOutput output; @Before public void createResources() @@ -66,11 +34,9 @@ public void createResources() plugin = new S3FileInputPlugin(); config = runtime.getExec().newConfigSource() .set("type", "s3") - .set("bucket", EMBULK_S3_TEST_BUCKET) - .set("path_prefix", EMBULK_S3_TEST_PATH_PREFIX) + .set("bucket", "dummy") + .set("path_prefix", "dummy") .set("parser", parserConfig(schemaConfig())); - runner = new FileInputRunner(runtime.getInstance(S3FileInputPlugin.class)); - output = new TestPageBuilderReader.MockPageOutput(); } @Test From 9ad7d37194a82cf4b1db582d0b0d9344299c7202 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Mon, 11 Apr 2016 23:12:37 -0700 Subject: [PATCH 4/5] Added Duration unit to use sec, msec, usec, and min suffix in params --- README.md | 29 +- .../input/s3/AbstractS3FileInputPlugin.java | 5 +- .../input/s3/AwsClientConfigurations.java | 156 +++++++ .../input/s3/AwsClientConfigurationsTask.java | 147 ++++++ .../s3/ClientConfigurationConfigurable.java | 421 ------------------ .../java/org/embulk/input/s3/Duration.java | 159 +++++++ .../java/org/embulk/input/s3/FileList.java | 9 +- .../TestClientConfigurationConfigurable.java | 13 +- .../org/embulk/input/s3/TestDuration.java | 55 +++ 9 files changed, 546 insertions(+), 448 deletions(-) create mode 100644 embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java create mode 100644 embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationsTask.java delete mode 100644 embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java create mode 100644 embulk-input-s3/src/main/java/org/embulk/input/s3/Duration.java create mode 100644 embulk-input-s3/src/test/java/org/embulk/input/s3/TestDuration.java diff --git a/README.md b/README.md index e23f1d2..1738380 100644 --- a/README.md +++ b/README.md @@ -53,11 +53,17 @@ - **session_token**: session token (string, required) +* **path_match_pattern**: regexp to match file paths. If a file path doesn't match with this pattern, the file will be skipped (regexp string, optional) + +* **total_file_count_limit**: maximum number of files to read (integer, optional) + +* **min_task_size** (experimental): minimum bytesize of a task. If this is larger than 0, one task includes multiple input files up until it becomes the bytesize in total. This is useful if too many number of tasks impacts performance of output or executor plugins badly. (integer, optional) + - **client_config**: configure S3 client config (optional) - - **protocol**: (enum, `HTTP` or `HTTPS`, optional) + - **protocol**: (enum, `HTTP` or `HTTPS`. default: `"HTTPS"`) - - **max_connections**: (int, optional) + - **max_connections**: (int, default: `50`) - **user_agent** (string, optional) @@ -75,13 +81,13 @@ - **proxy_workstation**: (string, optional) - - **max_error_retry**: (int, optional) + - **max_error_retry**: (int, default: `3`) - - **socket_timeout**: (int, optional) + - **socket_timeout**: (duration, default: `8min`) - - **connection_timeout**: (int, optional) + - **connection_timeout**: (duration, default: `50sec`) - - **request_timeout**: (int, optional) + - **request_timeout**: (duration, default: no timeout) - **use_reaper**: (boolean, optional) @@ -91,13 +97,13 @@ - **preemptive_basic_proxy_auth**: (boolean, optional) - - **connection_ttl**: (long, optional) + - **connection_ttl**: (duration, optional) - - **connection_max_idle_millis**: (long, optional) + - **connection_max_idle**: (duration, default: `60sec`) - **use_tcp_keep_alive**: (boolean, optional) - - **response_metadata_cache_size**: (int, optional) + - **response_metadata_cache_size**: (bytesize, optional) - **use_expect_continue**: (boolean, optional) @@ -107,11 +113,10 @@ - **provider**: (string, optional) -* **path_match_pattern**: regexp to match file paths. If a file path doesn't match with this pattern, the file will be skipped (regexp string, optional) + - **socket_send_buffer_size_hint**: (bytesize, optional) -* **total_file_count_limit**: maximum number of files to read (integer, optional) + - **socket_receive_buffer_size_hint**: (bytesize, optional) -* **min_task_size** (experimental): minimum bytesize of a task. If this is larger than 0, one task includes multiple input files up until it becomes the bytesize in total. This is useful if too many number of tasks impacts performance of output or executor plugins badly. (integer, optional) ## Example diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java index ddc9cb0..508290d 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java @@ -62,7 +62,7 @@ public interface PluginTask @Config("client_config") @ConfigDefault("{}") - public ClientConfigurationConfigurable.Task getClientConfigurationConfigurableTask(); + public AwsClientConfigurationsTask getClientConfig(); public FileList getFiles(); public void setFiles(FileList files); @@ -126,8 +126,7 @@ protected AWSCredentialsProvider getCredentialsProvider(PluginTask task) protected ClientConfiguration getClientConfiguration(PluginTask task) { - ClientConfigurationConfigurable.Task configurableTask = task.getClientConfigurationConfigurableTask(); - return ClientConfigurationConfigurable.getClientConfiguration(configurableTask); + return AwsClientConfigurations.getClientConfiguration(task.getClientConfig()); } private FileList listFiles(PluginTask task) diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java new file mode 100644 index 0000000..2c6e4d2 --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java @@ -0,0 +1,156 @@ +package org.embulk.input.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import org.embulk.config.ConfigException; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.security.SecureRandom; + +public class AwsClientConfigurations +{ + public static ClientConfiguration getClientConfiguration(AwsClientConfigurationsTask task) + { + ClientConfiguration c = new ClientConfiguration(); + + //if (task.getClientExecutionTimeout().isPresent()) { + // c.setClientExecutionTimeout(task.getClientExecutionTimeout().get()); + //} + + if (task.getConnectionMaxIdle().isPresent()) { + c.setConnectionMaxIdleMillis(task.getConnectionMaxIdle().get().getMillis()); + } + + if (task.getConnectionTimeout().isPresent()) { + c.setConnectionTimeout((int) task.getConnectionTimeout().get().getMillis()); + } + + if (task.getConnectionTTL().isPresent()) { + c.setConnectionTTL(task.getConnectionTTL().get().getMillis()); + } + + //if (task.getDnsResolver().isPresent()) { + // c.setDnsResolver(task.getDnsResolver().get()); + //} + + if (task.getLocalAddress().isPresent()) { + try { + InetAddress addr = InetAddress.getByName(task.getLocalAddress().get()); + c.setLocalAddress(addr); + } + catch (UnknownHostException e) { + throw new ConfigException("Invalid local_address", e); + } + } + + if (task.getMaxConnections().isPresent()) { + c.setMaxConnections(task.getMaxConnections().get()); + } + + if (task.getMaxErrorRetry().isPresent()) { + c.setMaxErrorRetry(task.getMaxErrorRetry().get()); + } + + if (task.getPreemptiveBasicProxyAuth().isPresent()) { + c.setPreemptiveBasicProxyAuth(task.getPreemptiveBasicProxyAuth().get()); + } + + if (task.getProtocol().isPresent()) { + c.setProtocol(task.getProtocol().get()); + } + + if (task.getProxyDomain().isPresent()) { + c.setProxyDomain(task.getProxyDomain().get()); + } + + if (task.getProxyHost().isPresent()) { + c.setProxyHost(task.getProxyHost().get()); + } + + if (task.getProxyPassword().isPresent()) { + c.setProxyPassword(task.getProxyPassword().get()); + } + + if (task.getProxyPort().isPresent()) { + c.setProxyPort(task.getProxyPort().get()); + } + + if (task.getProxyUsername().isPresent()) { + c.setProxyUsername(task.getProxyUsername().get()); + } + + if (task.getProxyWorkstation().isPresent()) { + c.setProxyWorkstation(task.getProxyWorkstation().get()); + } + + if (task.getRequestTimeout().isPresent()) { + c.setRequestTimeout((int) task.getRequestTimeout().get().getMillis()); + } + + if (task.getResponseMetadataCacheSize().isPresent()) { + c.setResponseMetadataCacheSize(task.getResponseMetadataCacheSize().get().getBytesInt()); + } + + //if (task.getRetryPolicy().isPresent()) { + // c.setRetryPolicy(task.getRetryPolicy().get()); + //} + + if (task.getSecureRandom().isPresent()) { + try { + AwsClientConfigurationsTask.SecureRandomTask secureRandomTask = task.getSecureRandom().get(); + SecureRandom rand = + secureRandomTask.getProvider().isPresent() + ? SecureRandom.getInstance(secureRandomTask.getAlgorithm(), secureRandomTask.getProvider().get()) + : SecureRandom.getInstance(secureRandomTask.getAlgorithm()); + c.setSecureRandom(rand); + } + catch (NoSuchAlgorithmException | NoSuchProviderException e) { + throw new ConfigException("Invalid secure_random", e); + } + } + + if (task.getSignerOverride().isPresent()) { + c.setSignerOverride(task.getSignerOverride().get()); + } + + if (task.getSocketTimeout().isPresent()) { + c.setSocketTimeout(task.getSocketTimeout().get().getMillisInt()); + } + + if (task.getUseExpectContinue().isPresent()) { + c.setUseExpectContinue(task.getUseExpectContinue().get()); + } + + if (task.getUseGzip().isPresent()) { + c.setUseGzip(task.getUseGzip().get()); + } + + if (task.getUserAgent().isPresent()) { + c.setUserAgent(task.getUserAgent().get()); + } + + if (task.getUseReaper().isPresent()) { + c.setUseReaper(task.getUseReaper().get()); + } + + if (task.getUseTcpKeepAlive().isPresent()) { + c.setUseTcpKeepAlive(task.getUseTcpKeepAlive().get()); + } + + //if (task.getUseThrottleRetries().isPresent()) { + // c.setUseThrottleRetries(task.getUseThrottleRetries().get()); + //} + + if (task.getSocketSendBufferSizeHint().isPresent() && task.getSocketReceiveBufferSizeHint().isPresent()) { + c.setSocketBufferSizeHints(task.getSocketSendBufferSizeHint().get().getBytesInt(), task.getSocketReceiveBufferSizeHint().get().getBytesInt()); + } + else if (task.getSocketSendBufferSizeHint().isPresent() || task.getSocketReceiveBufferSizeHint().isPresent()) { + throw new ConfigException("socket_send_buffer_size_hint and socket_receive_buffer_size_hint must set together"); + } + + return c; + } +} diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationsTask.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationsTask.java new file mode 100644 index 0000000..8e3a4f6 --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationsTask.java @@ -0,0 +1,147 @@ +package org.embulk.input.s3; + +import com.google.common.base.Optional; +import com.amazonaws.Protocol; +import org.embulk.config.Config; +import org.embulk.config.ConfigDefault; +import org.embulk.config.Task; +import org.embulk.spi.unit.ByteSize; + +public interface AwsClientConfigurationsTask + extends Task +{ + // NOTE: Can use `client_execution_timeout` from v1.10.65 + // @Config("client_execution_timeout") + // @ConfigDefault("null") + // Optional getClientExecutionTimeout(); + + @Config("connection_max_idle") + @ConfigDefault("null") + Optional getConnectionMaxIdle(); + + @Config("connection_timeout") + @ConfigDefault("null") // SDK default: 50sec + Optional getConnectionTimeout(); + + @Config("connection_ttl") + @ConfigDefault("null") + Optional getConnectionTTL(); + + // NOTE: DnsResolver is a interface + // @Config("dns_resolver") + // @ConfigDefault("null") + // Optional getDnsResolver(); + + @Config("local_address") + @ConfigDefault("null") + Optional getLocalAddress(); + + @Config("max_connections") + @ConfigDefault("50") + Optional getMaxConnections(); + + @Config("max_error_retry") + @ConfigDefault("3") + Optional getMaxErrorRetry(); + + @Config("preemptive_basic_proxy_auth") + @ConfigDefault("null") + Optional getPreemptiveBasicProxyAuth(); + + @Config("protocol") + @ConfigDefault("null") // SDK default: HTTPS + Optional getProtocol(); + + @Config("proxy_domain") + @ConfigDefault("null") + Optional getProxyDomain(); + + @Config("proxy_host") + @ConfigDefault("null") + Optional getProxyHost(); + + @Config("proxy_password") + @ConfigDefault("null") + Optional getProxyPassword(); + + @Config("proxy_port") + @ConfigDefault("null") + Optional getProxyPort(); + + @Config("proxy_username") + @ConfigDefault("null") + Optional getProxyUsername(); + + @Config("proxy_workstation") + @ConfigDefault("null") + Optional getProxyWorkstation(); + + @Config("request_timeout") + @ConfigDefault("null") + Optional getRequestTimeout(); + + @Config("response_metadata_cache_size") + @ConfigDefault("null") + Optional getResponseMetadataCacheSize(); + + // NOTE: RetryPolicy is a interface + // @Config("retry_policy") + // @ConfigDefault("null") + // Optional getRetryPolicy(); + + @Config("secure_random") + @ConfigDefault("null") + Optional getSecureRandom(); + + public interface SecureRandomTask + extends org.embulk.config.Task + { + @Config("algorithm") + String getAlgorithm(); + + @Config("provider") + @ConfigDefault("null") + Optional getProvider(); + } + + @Config("signer_override") + @ConfigDefault("null") + Optional getSignerOverride(); + + @Config("socket_timeout") + @ConfigDefault("\"8min\"") + Optional getSocketTimeout(); + + @Config("use_expect_continue") + @ConfigDefault("null") + Optional getUseExpectContinue(); + + @Config("use_gzip") + @ConfigDefault("null") + Optional getUseGzip(); + + @Config("user_agent") + @ConfigDefault("null") + Optional getUserAgent(); + + @Config("use_reaper") + @ConfigDefault("null") + Optional getUseReaper(); + + @Config("use_tcp_keep_alive") + @ConfigDefault("null") + Optional getUseTcpKeepAlive(); + + // NOTE: Can use `use_throttle_retries` from v1.10.65 + // @Config("use_throttle_retries") + // @ConfigDefault("null") + // Optional getUseThrottleRetries(); + + @Config("socket_send_buffer_size_hint") // used by setSocketBufferSizeHints + @ConfigDefault("null") + Optional getSocketSendBufferSizeHint(); + + @Config("socket_receive_buffer_size_hint") // used by setSocketBufferSizeHints + @ConfigDefault("null") + Optional getSocketReceiveBufferSizeHint(); +} diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java deleted file mode 100644 index 6e41c04..0000000 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/ClientConfigurationConfigurable.java +++ /dev/null @@ -1,421 +0,0 @@ -package org.embulk.input.s3; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.google.common.base.Optional; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.config.ConfigException; -import org.embulk.config.Task; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.SecureRandom; - -/** - * Created by takahiro.nakayama on 3/25/16. - * This class is utility class for ClientConfiguration - */ -public abstract class ClientConfigurationConfigurable -{ - public interface Task - extends org.embulk.config.Task - { - @Config("protocol") - @ConfigDefault("null") - Optional getProtocol(); - - @Config("max_connections") - @ConfigDefault("null") // default: 50 - Optional getMaxConnections(); - - @Config("user_agent") - @ConfigDefault("null") - Optional getUserAgent(); - - @Config("local_address") - @ConfigDefault("null") - Optional getLocalAddress(); - - @Config("proxy_host") - @ConfigDefault("null") - Optional getProxyHost(); - - @Config("proxy_port") - @ConfigDefault("null") - Optional getProxyPort(); - - @Config("proxy_username") - @ConfigDefault("null") - Optional getProxyUsername(); - - @Config("proxy_password") - @ConfigDefault("null") - Optional getProxyPassword(); - - @Config("proxy_domain") - @ConfigDefault("null") - Optional getProxyDomain(); - - @Config("proxy_workstation") - @ConfigDefault("null") - Optional getProxyWorkstation(); - - // NOTE: RetryPolicy is a interface - // @Config("retry_policy") - // @ConfigDefault("null") - // Optional getRetryPolicy(); - - @Config("max_error_retry") - @ConfigDefault("null") // default: 3 - Optional getMaxErrorRetry(); - - @Config("socket_timeout") - @ConfigDefault("null") // default: 8*60*1000 - Optional getSocketTimeout(); - - @Config("connection_timeout") - @ConfigDefault("null") - Optional getConnectionTimeout(); - - @Config("request_timeout") - @ConfigDefault("null") - Optional getRequestTimeout(); - - // NOTE: Can use `client_execution_timeout` from v1.10.65 - // @Config("client_execution_timeout") - // @ConfigDefault("null") - // Optional getClientExecutionTimeout(); - - @Config("use_reaper") - @ConfigDefault("null") - Optional getUseReaper(); - - // NOTE: Can use `use_throttle_retries` from v1.10.65 - // @Config("use_throttle_retries") - // @ConfigDefault("null") - // Optional getUseThrottleRetries(); - - @Config("use_gzip") - @ConfigDefault("null") - Optional getUseGzip(); - - @Config("socket_send_buffer_size_hints") // used by SocketBufferSizeHints - @ConfigDefault("null") - Optional getSocketSendBufferSizeHint(); - - @Config("socket_receive_buffer_size_hints") // used by SocketBufferSizeHints - @ConfigDefault("null") - Optional getSocketReceiveBufferSizeHints(); - - @Config("signer_override") - @ConfigDefault("null") - Optional getSignerOverride(); - - @Config("preemptive_basic_proxy_auth") - @ConfigDefault("null") - Optional getPreemptiveBasicProxyAuth(); - - @Config("connection_ttl") - @ConfigDefault("null") - Optional getConnectionTTL(); - - @Config("connection_max_idle_millis") - @ConfigDefault("null") - Optional getConnectionMaxIdleMillis(); - - @Config("use_tcp_keep_alive") - @ConfigDefault("null") - Optional getUseTcpKeepAlive(); - - // NOTE: DnsResolver is a interface - // @Config("dns_resolver") - // @ConfigDefault("null") - // Optional getDnsResolver(); - - @Config("response_metadata_cache_size") - @ConfigDefault("null") - Optional getResponseMetadataCacheSize(); - - @Config("secure_random") - @ConfigDefault("null") - Optional getSecureRandom(); - - @Config("use_expect_continue") - @ConfigDefault("null") - Optional getUseExpectContinue(); - } - - public interface SecureRandomTask - extends org.embulk.config.Task - { - @Config("algorithm") - String getAlgorithm(); - - @Config("provider") - @ConfigDefault("null") - Optional getProvider(); - } - - protected ClientConfigurationConfigurable() - { - } - - // For backward compatibility - public static final int DEFAULT_MAX_CONNECTIONS = 50; // SDK default: 50 - public static final int DEFAULT_MAX_ERROR_RETRY = 3; // SDK default: 3 - public static final int DEFAULT_SOCKET_TIMEOUT = 8*60*1000; // SDK default: 50*1000 - - public static ClientConfiguration getClientConfiguration(Task task) - { - ClientConfiguration clientConfiguration = new ClientConfiguration(); - - setProtocolOrDoNothing(clientConfiguration, task.getProtocol()); - setMaxConnectionsOrDoNothing(clientConfiguration, task.getMaxConnections()); - setUserAgentOrDoNothing(clientConfiguration, task.getUserAgent()); - - setLocalAddressOrDoNothing(clientConfiguration, task.getLocalAddress()); - setProxyHostOrDoNothing(clientConfiguration, task.getProxyHost()); - setProxyPortOrDoNothing(clientConfiguration, task.getProxyPort()); - - setProxyUsernameOrDoNothing(clientConfiguration, task.getProxyUsername()); - setProxyPasswordOrDoNothing(clientConfiguration, task.getProxyPassword()); - setProxyDomainOrDoNothing(clientConfiguration, task.getProxyDomain()); - - setProxyWorkstationOrDoNothing(clientConfiguration, task.getProxyWorkstation()); - setMaxErrorRetryOrDoNothing(clientConfiguration, task.getMaxErrorRetry()); - setSocketTimeoutOrDoNothing(clientConfiguration, task.getSocketTimeout()); - - setConnectionTimeoutOrDoNothing(clientConfiguration, task.getConnectionTimeout()); - setRequestTimeoutOrDoNothing(clientConfiguration, task.getRequestTimeout()); - setUseReaperOrDoNothing(clientConfiguration, task.getUseReaper()); - - setUseGzipOrDoNothing(clientConfiguration, task.getUseGzip()); - setSocketBufferSizeHintsOrDoNothing(clientConfiguration, task.getSocketSendBufferSizeHint(), - task.getSocketReceiveBufferSizeHints()); - - setSignerOverrideOrDoNothing(clientConfiguration, task.getSignerOverride()); - setPreemptiveBasicProxyAuthOrDoNothing(clientConfiguration, task.getPreemptiveBasicProxyAuth()); - setConnectionTTLOrDoNothing(clientConfiguration, task.getConnectionTTL()); - - setConnectionMaxIdleMillisOrDoNothing(clientConfiguration, task.getConnectionMaxIdleMillis()); - setUseTcpKeepAliveOrDoNothing(clientConfiguration, task.getUseTcpKeepAlive()); - setResponseMetadataCacheSizeOrDoNothing(clientConfiguration, task.getResponseMetadataCacheSize()); - - setSecureRandomOrDoNothing(clientConfiguration, task.getSecureRandom()); - setUseExpectContinueOrDoNothing(clientConfiguration, task.getUseExpectContinue()); - - setRetryPolicy(clientConfiguration); - setDnsResolver(clientConfiguration); - - return clientConfiguration; - } - - protected static void setProtocolOrDoNothing(ClientConfiguration clientConfiguration, Optional protocol) - { - if (protocol.isPresent()) { - clientConfiguration.setProtocol(protocol.get()); - } - } - - protected static void setMaxConnectionsOrDoNothing(ClientConfiguration clientConfiguration, Optional maxConnections) - { - clientConfiguration.setMaxConnections(maxConnections.or(DEFAULT_MAX_CONNECTIONS)); - } - - protected static void setUserAgentOrDoNothing(ClientConfiguration clientConfiguration, Optional userAgent) - { - if (userAgent.isPresent()) { - clientConfiguration.setUserAgent(userAgent.get()); - } - } - - protected static void setLocalAddressOrDoNothing(ClientConfiguration clientConfiguration, Optional localAddress) - { - if (localAddress.isPresent()) { - InetAddress inetAddress = getInetAddress(localAddress.get()); - clientConfiguration.setLocalAddress(inetAddress); - } - } - - protected static void setProxyHostOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyHost) - { - if (proxyHost.isPresent()) { - clientConfiguration.setProxyHost(proxyHost.get()); - } - } - - protected static void setProxyPortOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyPort) - { - if (proxyPort.isPresent()) { - clientConfiguration.setProxyPort(proxyPort.get()); - } - } - - protected static void setProxyUsernameOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyUsername) - { - if (proxyUsername.isPresent()) { - clientConfiguration.setProxyUsername(proxyUsername.get()); - } - } - - protected static void setProxyPasswordOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyPassword) - { - if (proxyPassword.isPresent()) { - clientConfiguration.setProxyPassword(proxyPassword.get()); - } - } - - protected static void setProxyDomainOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyDomain) - { - if (proxyDomain.isPresent()) { - clientConfiguration.setProxyDomain(proxyDomain.get()); - } - } - - protected static void setProxyWorkstationOrDoNothing(ClientConfiguration clientConfiguration, Optional proxyWorkstation) - { - if (proxyWorkstation.isPresent()) { - clientConfiguration.setProxyWorkstation(proxyWorkstation.get()); - } - } - - protected static void setMaxErrorRetryOrDoNothing(ClientConfiguration clientConfiguration, Optional maxErrorRetry) - { - clientConfiguration.setMaxErrorRetry(maxErrorRetry.or(DEFAULT_MAX_ERROR_RETRY)); - } - - protected static void setSocketTimeoutOrDoNothing(ClientConfiguration clientConfiguration, Optional socketTimeout) - { - clientConfiguration.setSocketTimeout(socketTimeout.or(DEFAULT_SOCKET_TIMEOUT)); - } - - protected static void setConnectionTimeoutOrDoNothing(ClientConfiguration clientConfiguration, Optional connectionTimeout) - { - if (connectionTimeout.isPresent()) { - clientConfiguration.setConnectionTimeout(connectionTimeout.get()); - } - } - - protected static void setRequestTimeoutOrDoNothing(ClientConfiguration clientConfiguration, Optional requestTimeout) - { - if (requestTimeout.isPresent()) { - clientConfiguration.setRequestTimeout(requestTimeout.get()); - } - } - - protected static void setUseReaperOrDoNothing(ClientConfiguration clientConfiguration, Optional useReaper) - { - if (useReaper.isPresent()) { - clientConfiguration.setUseReaper(useReaper.get()); - } - } - - protected static void setUseGzipOrDoNothing(ClientConfiguration clientConfiguration, Optional useGzip) - { - if (useGzip.isPresent()) { - clientConfiguration.setUseGzip(useGzip.get()); - } - } - - protected static void setSocketBufferSizeHintsOrDoNothing(ClientConfiguration clientConfiguration, - Optional socketSendBufferSizeHint, Optional socketReceiveBufferSizeHint) - { - if (socketSendBufferSizeHint.isPresent() && socketReceiveBufferSizeHint.isPresent()) { - clientConfiguration.setSocketBufferSizeHints(socketSendBufferSizeHint.get(), socketReceiveBufferSizeHint.get()); - } - } - - protected static void setSignerOverrideOrDoNothing(ClientConfiguration clientConfiguration, Optional value) - { - if (value.isPresent()) { - clientConfiguration.setSignerOverride(value.get()); - } - } - - protected static void setPreemptiveBasicProxyAuthOrDoNothing(ClientConfiguration clientConfiguration, Optional preemptiveBasicProxyAuth) - { - if (preemptiveBasicProxyAuth.isPresent()) { - clientConfiguration.setPreemptiveBasicProxyAuth(preemptiveBasicProxyAuth.get()); - } - } - - protected static void setConnectionTTLOrDoNothing(ClientConfiguration clientConfiguration, Optional connectionTTL) - { - if (connectionTTL.isPresent()) { - clientConfiguration.setConnectionTTL(connectionTTL.get()); - } - } - - protected static void setConnectionMaxIdleMillisOrDoNothing(ClientConfiguration clientConfiguration, Optional connectionMaxIdleMillis) - { - if (connectionMaxIdleMillis.isPresent()) { - clientConfiguration.setConnectionMaxIdleMillis(connectionMaxIdleMillis.get()); - } - } - - protected static void setUseTcpKeepAliveOrDoNothing(ClientConfiguration clientConfiguration, Optional useTcpKeepAlive) - { - if (useTcpKeepAlive.isPresent()) { - clientConfiguration.setUseTcpKeepAlive(useTcpKeepAlive.get()); - } - } - - protected static void setResponseMetadataCacheSizeOrDoNothing(ClientConfiguration clientConfiguration, Optional responseMetadataCacheSize) - { - if (responseMetadataCacheSize.isPresent()) { - clientConfiguration.setResponseMetadataCacheSize(responseMetadataCacheSize.get()); - } - } - - protected static void setSecureRandomOrDoNothing(ClientConfiguration clientConfiguration, Optional secureRandomTask) - { - if (secureRandomTask.isPresent()) { - SecureRandom secureRandom = getSecureRandom(secureRandomTask.get()); - clientConfiguration.setSecureRandom(secureRandom); - } - } - - protected static void setUseExpectContinueOrDoNothing(ClientConfiguration clientConfiguration, Optional useExpectContinue) - { - if (useExpectContinue.isPresent()) { - clientConfiguration.setUseExpectContinue(useExpectContinue.get()); - } - } - - protected static void setRetryPolicy(ClientConfiguration clientConfiguration) - { - } - - protected static void setDnsResolver(ClientConfiguration clientConfiguration) - { - } - - - private static InetAddress getInetAddress(String host) - { - try { - return InetAddress.getByName(host); - } - catch (UnknownHostException e) { - throw new ConfigException(e); - } - } - - private static SecureRandom getSecureRandom(SecureRandomTask secureRandomTask) - { - try { - if (secureRandomTask.getProvider().isPresent()) { - return SecureRandom.getInstance(secureRandomTask.getAlgorithm(), secureRandomTask.getProvider().get()); - } - else { - return SecureRandom.getInstance(secureRandomTask.getAlgorithm()); - } - } - catch (NoSuchAlgorithmException | NoSuchProviderException e) { - throw new ConfigException(e); - } - } -} diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/Duration.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/Duration.java new file mode 100644 index 0000000..98a80c8 --- /dev/null +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/Duration.java @@ -0,0 +1,159 @@ +package org.embulk.input.s3; + +import java.util.Objects; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +// This class should be moved to org.embulk.spi.unit +public class Duration + implements Comparable +{ + private static final Pattern PATTERN = Pattern.compile("\\A(\\d+(?:\\.\\d+)?)\\s?([a-zA-Z]*)\\z"); + + private final long usec; + private final Unit displayUnit; + + public Duration(double duration, Unit unit) + { + Preconditions.checkArgument(!Double.isInfinite(duration), "duration is infinite"); + Preconditions.checkArgument(!Double.isNaN(duration), "duration is not a number"); + Preconditions.checkArgument(duration >= 0, "duration is negative"); + Preconditions.checkNotNull(unit, "unit is null"); + Preconditions.checkArgument(duration * unit.getFactorToUsec() <= Long.MAX_VALUE, "duration is large than (2^63)-1 in milliseconds"); + this.usec = (long) (duration * unit.getFactorToUsec()); + this.displayUnit = unit; + } + + @JsonCreator + @Deprecated + public Duration(long usec) + { + Preconditions.checkArgument(usec >= 0, "duration is negative"); + this.usec = usec; + this.displayUnit = Unit.MSEC; + } + + public long getMillis() + { + return usec / 1000; + } + + public int getMillisInt() + { + if (usec / 1000 > Integer.MAX_VALUE) { + throw new RuntimeException("Duration is too large (must be smaller than (2^31)-1 milliseconds, abount 24 days)"); + } + return (int) (usec / 1000); + } + + public long roundTo(Unit unit) + { + return (long) Math.floor(getValue(unit) + 0.5); + } + + public double getValue(Unit unit) + { + return usec / (double) unit.getFactorToUsec(); + } + + @JsonCreator + public static Duration parseDuration(String duration) + { + Preconditions.checkNotNull(duration, "duration is null"); + Preconditions.checkArgument(!duration.isEmpty(), "duration is empty"); + + Matcher matcher = PATTERN.matcher(duration); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid time duration string '" + duration + "'"); + } + + double value = Double.parseDouble(matcher.group(1)); // NumberFormatException extends IllegalArgumentException. + + String unitString = matcher.group(2); + if (unitString.isEmpty()) { + return new Duration(value, Unit.SECONDS); + } else { + String upperUnitString = unitString.toUpperCase(Locale.ENGLISH); + for (Unit unit : Unit.values()) { + if (unit.getUnitString().toUpperCase(Locale.ENGLISH).equals(upperUnitString)) { + return new Duration(value, unit); + } + } + } + + throw new IllegalArgumentException("Unknown unit '" + unitString + "'"); + } + + @JsonValue + @Override + public String toString() + { + double value = getValue(displayUnit); + String integer = String.format(Locale.ENGLISH, "%d", (long) value); + String decimal = String.format(Locale.ENGLISH, "%.2f", value); + if (decimal.equals(integer + ".00")) { + return integer + displayUnit.getUnitString(); + } else { + return decimal + displayUnit.getUnitString(); + } + } + + @Override + public int compareTo(Duration o) + { + return Long.compare(usec, o.usec); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (!(obj instanceof Duration)) { + return false; + } + Duration o = (Duration) obj; + return this.usec == o.usec; + } + + @Override + public int hashCode() + { + return Objects.hashCode(usec); + } + + public enum Unit + { + USEC(1L, "usec"), + MSEC(1000L, "msec"), + SECONDS(1000L*1000L, "sec"), + MINUTES(60*1000L*1000L, "min"), + HOURS(60*60*1000L*1000L, "hour"), + DAYS(24*60*60*1000L*1000L, "day"); + + private final long factorToUsec; + private final String unitString; + + Unit(long factorToUsec, String unitString) + { + this.factorToUsec = factorToUsec; + this.unitString = unitString; + } + + long getFactorToUsec() + { + return factorToUsec; + } + + String getUnitString() + { + return unitString; + } + } +} diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java index 2460853..45d327a 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/FileList.java @@ -19,6 +19,7 @@ import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigSource; +import org.embulk.spi.unit.ByteSize; import com.google.common.base.Throwables; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -42,7 +43,7 @@ public interface Task // TODO support more algorithms to combine tasks @Config("min_task_size") @ConfigDefault("0") - long getMinTaskSize(); + ByteSize getMinTaskSize(); } public static class Entry @@ -74,7 +75,7 @@ public static class Builder private String last = null; private int limitCount = Integer.MAX_VALUE; - private long minTaskSize = 1; + private long minTaskSize = 0; private Pattern pathMatchPattern; private final ByteBuffer castBuffer = ByteBuffer.allocate(4); @@ -84,7 +85,7 @@ public Builder(Task task) this(); this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern()); this.limitCount = task.getTotalFileCountLimit(); - this.minTaskSize = task.getMinTaskSize(); + this.minTaskSize = task.getMinTaskSize().getBytes(); } public Builder(ConfigSource config) @@ -92,7 +93,7 @@ public Builder(ConfigSource config) this(); this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*")); this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE); - this.minTaskSize = config.get(long.class, "min_task_size", 0L); + this.minTaskSize = config.get(ByteSize.class, "min_task_size", new ByteSize(0)).getBytes(); } public Builder() diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java index 7279ec2..3b7958a 100644 --- a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java @@ -17,9 +17,6 @@ import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig; import static org.junit.Assert.assertEquals; -/** - * Created by takahiro.nakayama on 3/30/16. - */ public class TestClientConfigurationConfigurable { @Rule @@ -56,14 +53,14 @@ public void setOneParam() public void setTwoParam() { config.setNested("client_config", Exec.newConfigSource() - .set("socket_send_buffer_size_hints", 4) - .set("socket_receive_buffer_size_hints", 8)); + .set("socket_send_buffer_size_hint", "1MB") + .set("socket_receive_buffer_size_hint", "1MB")); ClientConfiguration clientConfiguration = getClientConfiguration(); int[] socketBufferSizeHints = clientConfiguration.getSocketBufferSizeHints(); - assertEquals(4, socketBufferSizeHints[0]); - assertEquals(8, socketBufferSizeHints[1]); + assertEquals(1 << 20, socketBufferSizeHints[0]); + assertEquals(1 << 20, socketBufferSizeHints[1]); } @Test @@ -108,4 +105,4 @@ private ClientConfiguration getClientConfiguration() S3FileInputPlugin.S3PluginTask task = config.loadConfig(S3FileInputPlugin.S3PluginTask.class); return plugin.getClientConfiguration(task); } -} \ No newline at end of file +} diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestDuration.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestDuration.java new file mode 100644 index 0000000..8d731f0 --- /dev/null +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestDuration.java @@ -0,0 +1,55 @@ +package org.embulk.input.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import org.embulk.EmbulkTestRuntime; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; +import org.embulk.spi.Exec; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; + +import static org.embulk.input.s3.TestS3FileInputPlugin.parserConfig; +import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig; +import static org.junit.Assert.assertEquals; + +public class TestDuration +{ + @Test + public void testUnits() + { + assertDuration(1, "1usec"); + assertDuration(1000L, "1msec"); + assertDuration(1000*1000L, "1sec"); + assertDuration(60*1000*1000L, "1min"); + assertDuration(60*60*1000*1000L, "1hour"); + assertDuration(24*60*60*1000*1000L, "1day"); + + assertDuration(2, "2usec"); + assertDuration(2000L, "2msec"); + assertDuration(2000*1000L, "2sec"); + assertDuration(2*60*1000*1000L, "2min"); + assertDuration(2*60*60*1000*1000L, "2hour"); + assertDuration(2*24*60*60*1000*1000L, "2day"); + + assertDuration(0, "0.0usec"); + assertDuration(2400L, "2.4msec"); + assertDuration(2400*1000L, "2.4sec"); + assertDuration(144*1000*1000L, "2.4min"); + assertDuration(144*60*1000*1000L, "2.4hour"); + assertDuration(60*60*60*1000*1000L, "2.5day"); + } + + public void assertDuration(long usec, String str) + { + assertEquals(usec, Duration.parseDuration(str).roundTo(Duration.Unit.USEC)); + assertEquals(Duration.parseDuration(str), Duration.parseDuration(Duration.parseDuration(str).toString())); + if (usec % 1000 == 0) { + assertEquals(usec / 1000L, Duration.parseDuration(str).getMillis()); + } + } +} From 33dd804cfccf98eaa2d84b891c82aa64f8d8da05 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Mon, 11 Apr 2016 23:29:37 -0700 Subject: [PATCH 5/5] renamed TestClientConfigurationConfigurable -> TestAwsClientConfiguration --- .../java/org/embulk/input/s3/AbstractS3FileInputPlugin.java | 2 +- ...figurationsTask.java => AwsClientConfigurationTask.java} | 2 +- .../java/org/embulk/input/s3/AwsClientConfigurations.java | 4 ++-- ...ionConfigurable.java => TestAwsClientConfiguration.java} | 6 +++--- 4 files changed, 7 insertions(+), 7 deletions(-) rename embulk-input-s3/src/main/java/org/embulk/input/s3/{AwsClientConfigurationsTask.java => AwsClientConfigurationTask.java} (98%) rename embulk-input-s3/src/test/java/org/embulk/input/s3/{TestClientConfigurationConfigurable.java => TestAwsClientConfiguration.java} (95%) diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java index 508290d..9df4387 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AbstractS3FileInputPlugin.java @@ -62,7 +62,7 @@ public interface PluginTask @Config("client_config") @ConfigDefault("{}") - public AwsClientConfigurationsTask getClientConfig(); + public AwsClientConfigurationTask getClientConfig(); public FileList getFiles(); public void setFiles(FileList files); diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationsTask.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationTask.java similarity index 98% rename from embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationsTask.java rename to embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationTask.java index 8e3a4f6..db36af8 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationsTask.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurationTask.java @@ -7,7 +7,7 @@ import org.embulk.config.Task; import org.embulk.spi.unit.ByteSize; -public interface AwsClientConfigurationsTask +public interface AwsClientConfigurationTask extends Task { // NOTE: Can use `client_execution_timeout` from v1.10.65 diff --git a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java index 2c6e4d2..9a81238 100644 --- a/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java +++ b/embulk-input-s3/src/main/java/org/embulk/input/s3/AwsClientConfigurations.java @@ -12,7 +12,7 @@ public class AwsClientConfigurations { - public static ClientConfiguration getClientConfiguration(AwsClientConfigurationsTask task) + public static ClientConfiguration getClientConfiguration(AwsClientConfigurationTask task) { ClientConfiguration c = new ClientConfiguration(); @@ -100,7 +100,7 @@ public static ClientConfiguration getClientConfiguration(AwsClientConfigurations if (task.getSecureRandom().isPresent()) { try { - AwsClientConfigurationsTask.SecureRandomTask secureRandomTask = task.getSecureRandom().get(); + AwsClientConfigurationTask.SecureRandomTask secureRandomTask = task.getSecureRandom().get(); SecureRandom rand = secureRandomTask.getProvider().isPresent() ? SecureRandom.getInstance(secureRandomTask.getAlgorithm(), secureRandomTask.getProvider().get()) diff --git a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestAwsClientConfiguration.java similarity index 95% rename from embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java rename to embulk-input-s3/src/test/java/org/embulk/input/s3/TestAwsClientConfiguration.java index 3b7958a..9cfe541 100644 --- a/embulk-input-s3/src/test/java/org/embulk/input/s3/TestClientConfigurationConfigurable.java +++ b/embulk-input-s3/src/test/java/org/embulk/input/s3/TestAwsClientConfiguration.java @@ -17,7 +17,7 @@ import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig; import static org.junit.Assert.assertEquals; -public class TestClientConfigurationConfigurable +public class TestAwsClientConfiguration { @Rule public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); @@ -54,13 +54,13 @@ public void setTwoParam() { config.setNested("client_config", Exec.newConfigSource() .set("socket_send_buffer_size_hint", "1MB") - .set("socket_receive_buffer_size_hint", "1MB")); + .set("socket_receive_buffer_size_hint", "2MB")); ClientConfiguration clientConfiguration = getClientConfiguration(); int[] socketBufferSizeHints = clientConfiguration.getSocketBufferSizeHints(); assertEquals(1 << 20, socketBufferSizeHints[0]); - assertEquals(1 << 20, socketBufferSizeHints[1]); + assertEquals(2 << 20, socketBufferSizeHints[1]); } @Test