diff --git a/.Rbuildignore b/.Rbuildignore
index 67c3eef..d9f8952 100644
--- a/.Rbuildignore
+++ b/.Rbuildignore
@@ -16,3 +16,4 @@
^LICENSE$
^tests/testthat/test-dbplyr-ch.R$
.ipynb_checkpoints
+^src/vendor/clickhouse-cpp/build
\ No newline at end of file
diff --git a/DESCRIPTION b/DESCRIPTION
index eab441a..e124fe1 100644
--- a/DESCRIPTION
+++ b/DESCRIPTION
@@ -1,7 +1,7 @@
Package: RClickhouse
Type: Package
Title: 'Yandex Clickhouse' Interface for R with Basic 'dplyr' Support
-Version: 0.6.3
+Version: 0.6.4
Encoding: UTF-8
Authors@R: c(
person("Christian", "Hotz-Behofsits", email = "christian.hotz-behofsits@wu.ac.at", role = c("aut", "cre")),
@@ -22,9 +22,11 @@ Authors@R: c(
Description: 'Yandex Clickhouse' () is a high-performance relational column-store database to enable
big data exploration and 'analytics' scaling to petabytes of data. Methods are
provided that enable working with 'Yandex Clickhouse' databases via
- 'DBI' methods and using 'dplyr'/'dbplyr' idioms.
+ 'DBI' methods and using 'dplyr'/'dbplyr' idioms.
+ NOTE(mmitkevich): https://github.com/ClickHouse/clickhouse-cpp/commit/87fc1186063b82b8ddf37b1c70c485bdd7504f8d
+ https://github.com/mmitkevich/RClickhouse
License: GPL-2
-SystemRequirements: C++11
+SystemRequirements: C++17
Imports:
dplyr (>= 0.7.0),
dbplyr (>= 1.0.0),
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..0f5f5cb
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,33 @@
+# Makefile for generating R packages.
+# 2011 Andrew Redd
+#
+# Assumes Makefile is in a folder where package contents are in a subfolder pkg.
+# Roxygen uses the roxygen2 package, and will run automatically on check and all.
+
+PKG_VERSION=$(shell grep -i ^version DESCRIPTION | cut -d : -d \ -f 2)
+PKG_NAME=$(shell grep -i ^package DESCRIPTION | cut -d : -d \ -f 2)
+
+R_FILES := $(wildcard R/*.R)
+SRC_FILES := $(wildcard R/*.R)
+PKG_FILES := DESCRIPTION NAMESPACE $(R_FILES) $(SRC_FILES)
+
+.PHONY: tarball install check clean roxygen
+
+tarball: $(PKG_NAME)_$(PKG_VERSION).tar.gz
+$(PKG_NAME)_$(PKG_VERSION).tar.gz: $(PKG_FILES)
+ R CMD build .
+
+all: clean check install
+
+check: $(PKG_NAME)_$(PKG_VERSION).tar.gz roxygen
+ R CMD check $(PKG_NAME)_$(PKG_VERSION).tar.gz
+
+install: $(PKG_NAME)_$(PKG_VERSION).tar.gz
+ R CMD INSTALL $(PKG_NAME)_$(PKG_VERSION).tar.gz
+
+roxygen:
+ Rscript -e "library(roxygen2);roxygenize('.')"
+
+clean:
+ -rm -f $(PKG_NAME)_*.tar.gz
+ -rm -r -f $(PKG_NAME).Rcheck
\ No newline at end of file
diff --git a/NAMESPACE b/NAMESPACE
index 80265da..308482e 100644
--- a/NAMESPACE
+++ b/NAMESPACE
@@ -56,4 +56,5 @@ importFrom(dplyr,db_explain)
importFrom(dplyr,sql_escape_ident)
importFrom(dplyr,sql_escape_string)
importFrom(dplyr,sql_translate_env)
+import(openssl)
useDynLib(RClickhouse, .registration=TRUE)
diff --git a/src/Makevars b/src/Makevars
index 1a11836..0256239 100644
--- a/src/Makevars
+++ b/src/Makevars
@@ -1 +1 @@
-include Makevars.common
+include Makevars.common
\ No newline at end of file
diff --git a/src/Makevars.common b/src/Makevars.common
index b68d2c6..3d6045f 100644
--- a/src/Makevars.common
+++ b/src/Makevars.common
@@ -1,35 +1,38 @@
-PKG_CPPFLAGS = $(SYS_FLAGS) -I. -I../inst/include -I./vendor/clickhouse-cpp -I./vendor/clickhouse-cpp/contrib -I./vendor/clickhouse-cpp/contrib/bigerint
-
-CXX_STD = CXX11
+CXX_STD = CXX17
+PKG_CPPFLAGS = $(SYS_FLAGS) -I. -I../inst/include -I./vendor/clickhouse-cpp -I./vendor/clickhouse-cpp/contrib
+# -I./vendor/clickhouse-cpp/contrib/bigerint
OBJ_FILES = \
-vendor/clickhouse-cpp/clickhouse/columns/string.o \
-vendor/clickhouse-cpp/clickhouse/columns/date.o \
-vendor/clickhouse-cpp/clickhouse/columns/numeric.o \
-vendor/clickhouse-cpp/clickhouse/columns/decimal.o \
-vendor/clickhouse-cpp/clickhouse/columns/tuple.o \
-vendor/clickhouse-cpp/clickhouse/columns/array.o \
-vendor/clickhouse-cpp/clickhouse/columns/factory.o \
-vendor/clickhouse-cpp/clickhouse/columns/nullable.o \
-vendor/clickhouse-cpp/clickhouse/columns/enum.o \
-vendor/clickhouse-cpp/clickhouse/columns/uuid.o \
-vendor/clickhouse-cpp/clickhouse/columns/ip4.o \
-vendor/clickhouse-cpp/clickhouse/columns/ip6.o \
-vendor/clickhouse-cpp/clickhouse/query.o \
-vendor/clickhouse-cpp/clickhouse/base/platform.o \
vendor/clickhouse-cpp/clickhouse/base/socket.o \
+vendor/clickhouse-cpp/clickhouse/base/platform.o \
+vendor/clickhouse-cpp/clickhouse/base/sslsocket.o \
vendor/clickhouse-cpp/clickhouse/base/input.o \
vendor/clickhouse-cpp/clickhouse/base/output.o \
-vendor/clickhouse-cpp/clickhouse/base/coded.o \
vendor/clickhouse-cpp/clickhouse/base/compressed.o \
-vendor/clickhouse-cpp/clickhouse/client.o \
+vendor/clickhouse-cpp/clickhouse/base/wire_format.o \
+vendor/clickhouse-cpp/clickhouse/block.o \
+vendor/clickhouse-cpp/clickhouse/query.o \
vendor/clickhouse-cpp/clickhouse/types/types.o \
vendor/clickhouse-cpp/clickhouse/types/type_parser.o \
-vendor/clickhouse-cpp/clickhouse/block.o \
-vendor/clickhouse-cpp/contrib/cityhash/city.o \
+vendor/clickhouse-cpp/clickhouse/client.o \
+vendor/clickhouse-cpp/clickhouse/columns/ip6.o \
+vendor/clickhouse-cpp/clickhouse/columns/ip4.o \
+vendor/clickhouse-cpp/clickhouse/columns/lowcardinality.o \
+vendor/clickhouse-cpp/clickhouse/columns/numeric.o \
+vendor/clickhouse-cpp/clickhouse/columns/decimal.o \
+vendor/clickhouse-cpp/clickhouse/columns/array.o \
+vendor/clickhouse-cpp/clickhouse/columns/factory.o \
+vendor/clickhouse-cpp/clickhouse/columns/date.o \
+vendor/clickhouse-cpp/clickhouse/columns/enum.o \
+vendor/clickhouse-cpp/clickhouse/columns/tuple.o \
+vendor/clickhouse-cpp/clickhouse/columns/itemview.o \
+vendor/clickhouse-cpp/clickhouse/columns/string.o \
+vendor/clickhouse-cpp/clickhouse/columns/nullable.o \
+vendor/clickhouse-cpp/clickhouse/columns/uuid.o \
+vendor/clickhouse-cpp/contrib/lz4/lz4hc.o \
vendor/clickhouse-cpp/contrib/lz4/lz4.o \
-vendor/clickhouse-cpp/contrib/lz4/lz4hc.o
+vendor/clickhouse-cpp/contrib/cityhash/city.o
-PKG_LIBS = $(OBJ_FILES) -lpthread $(SYS_LIBS)
+PKG_LIBS = $(OBJ_FILES) -lpthread -lssl $(SYS_LIBS)
$(SHLIB): $(OBJ_FILES)
diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp
index 12582b5..0ed5d77 100644
--- a/src/RcppExports.cpp
+++ b/src/RcppExports.cpp
@@ -8,6 +8,11 @@
using namespace Rcpp;
+#ifdef RCPP_USE_GLOBAL_ROSTREAM
+Rcpp::Rostream& Rcpp::Rcout = Rcpp::Rcpp_cout_get();
+Rcpp::Rostream& Rcpp::Rcerr = Rcpp::Rcpp_cerr_get();
+#endif
+
// fetch
DataFrame fetch(XPtr res, ssize_t n);
static SEXP _RClickhouse_fetch_try(SEXP resSEXP, SEXP nSEXP) {
diff --git a/src/result.cpp b/src/result.cpp
index e5df40f..f488972 100644
--- a/src/result.cpp
+++ b/src/result.cpp
@@ -1,7 +1,8 @@
#include
+#include
+#include "clickhouse/columns/string.h"
#include "result.h"
-
-
+
// helper function which emits an R warning without causing a longjmp
// see https://stackoverflow.com/questions/24557711/how-to-generate-an-r-warning-safely-in-rcpp
void warn(std::string text) {
@@ -62,6 +63,37 @@ void convertEntries(std::shared_ptr in, NullCol nullCol, RT &out,
}
+// workaround lack of Rcpp::StringVector x; x[i] = std::string_view();
+template<>
+void convertEntries(std::shared_ptr in, NullCol nullCol, Rcpp::StringVector &out,
+ size_t offset, size_t start, size_t end)
+{
+ for(size_t j = start; j < end; j++) {
+ // can't use the ternary operator here, since that would require explicit
+ // conversion from the Clickhouse storage type (which is far messier)
+ if(nullCol && nullCol->IsNull(j)) {
+ out[offset+j-start] = Rcpp::StringVector::get_na();
+ } else {
+ out[offset+j-start] = std::string(in->At(j));
+ }
+ }
+}
+
+template<>
+void convertEntries(std::shared_ptr in, NullCol nullCol, Rcpp::StringVector &out,
+ size_t offset, size_t start, size_t end)
+{
+ for(size_t j = start; j < end; j++) {
+ // can't use the ternary operator here, since that would require explicit
+ // conversion from the Clickhouse storage type (which is far messier)
+ if(nullCol && nullCol->IsNull(j)) {
+ out[offset+j-start] = Rcpp::StringVector::get_na();
+ } else {
+ out[offset+j-start] = std::string(in->At(j));
+ }
+ }
+}
+
template<>
void convertEntries(std::shared_ptr in, NullCol nullCol, Rcpp::StringVector &out,
size_t offset, size_t start, size_t end) {
@@ -104,7 +136,18 @@ void convertEntries(std::shared_ptr
+void convertEntries(std::shared_ptr in,
+ NullCol nullCol, Rcpp::DatetimeVector &out, size_t offset, size_t start, size_t end) {
+ double factor = std::pow(10.,in->GetPrecision());
+ for(size_t j = start; j < end; j++) {
+ if(nullCol && nullCol->IsNull(j)) {
+ out[offset+j-start] = Rcpp::DateVector::get_na();
+ } else {
+ out[offset+j-start] = static_cast(in->At(j)/factor);
+ }
+ }
+}
std::string formatUUID(const ch::UInt128 &v) {
const size_t bufsize = 128/4 + 4 + 1; // 128 bit in hexadecimal + 4 dashes + null terminator
char buf[bufsize];
@@ -292,6 +335,9 @@ std::unique_ptr Result::buildConverter(std::string name, ch::TypeRef
return std::unique_ptr>(new ScalarConverter);
case TC::Date:
return std::unique_ptr>(new ScalarConverter);
+ case TC::DateTime64:
+ return std::unique_ptr>(new ScalarConverter);
+
case TC::Nullable:
{
// downcast to NullableType to access GetNestedType member
diff --git a/src/vendor/clickhouse-cpp/.gitignore b/src/vendor/clickhouse-cpp/.gitignore
index 4c1637d..8def154 100644
--- a/src/vendor/clickhouse-cpp/.gitignore
+++ b/src/vendor/clickhouse-cpp/.gitignore
@@ -274,3 +274,7 @@ BUCKAROO_DEPS
# Visual Studio Code
/.vscode/
+
+# Vim
+*.swp
+*.swo
diff --git a/src/vendor/clickhouse-cpp/.travis.yml b/src/vendor/clickhouse-cpp/.travis.yml
index 7d38ea0..7167c0c 100644
--- a/src/vendor/clickhouse-cpp/.travis.yml
+++ b/src/vendor/clickhouse-cpp/.travis.yml
@@ -35,20 +35,28 @@ matrix:
- MATRIX_EVAL="CC=clang-6.0 && CXX=clang++-6.0"
- os: osx
- osx_image: xcode8.2
+ osx_image: xcode9.4
compiler: clang
-before_install:
- - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then echo 'deb http://repo.yandex.ru/clickhouse/deb/stable main/' | sudo tee /etc/apt/sources.list.d/clickhouse.list ; fi
- - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 ; fi
- - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then sudo apt-get update -q && sudo apt-get install -q -y --allow-unauthenticated clickhouse-server-common ; fi
- - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then sudo service clickhouse-server start ; fi
+before_install: |
+ if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then
+ export CH_SERVER_VERSION="21.3.17.2"
+ sudo apt-get install apt-transport-https ca-certificates dirmngr
+ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
+ echo "deb https://repo.clickhouse.com/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
+ sudo apt-get update
+ sudo apt-get install -y \
+ clickhouse-server=${CH_SERVER_VERSION} \
+ clickhouse-client=${CH_SERVER_VERSION} \
+ clickhouse-common-static=${CH_SERVER_VERSION}
+ sudo service clickhouse-server start
+ fi
# Build steps
-script:
- - eval "${MATRIX_EVAL}"
- - mkdir build
- - cd build
- - cmake .. -DBUILD_TESTS=ON && make
- - if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then ./ut/clickhouse-cpp-ut ; fi
- - if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then ./ut/clickhouse-cpp-ut --gtest_filter='-Client/*' ; fi
+script: |
+ eval "${MATRIX_EVAL}"
+ mkdir build
+ cd build
+ cmake .. -DBUILD_TESTS=ON && make
+ if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then ./ut/clickhouse-cpp-ut ; fi
+ if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then ./ut/clickhouse-cpp-ut --gtest_filter=-"Client/*:*Performance*" ; fi
diff --git a/src/vendor/clickhouse-cpp/CMakeLists.txt b/src/vendor/clickhouse-cpp/CMakeLists.txt
index be9c929..f3193fc 100644
--- a/src/vendor/clickhouse-cpp/CMakeLists.txt
+++ b/src/vendor/clickhouse-cpp/CMakeLists.txt
@@ -1,46 +1,50 @@
-CMAKE_MINIMUM_REQUIRED(VERSION 3.0.2)
+CMAKE_MINIMUM_REQUIRED (VERSION 3.0.2)
-INCLUDE (cmake/cpp11.cmake)
+INCLUDE (cmake/cpp17.cmake)
INCLUDE (cmake/subdirs.cmake)
+INCLUDE (cmake/openssl.cmake)
-OPTION(BUILD_BENCHMARK "Build benchmark" OFF)
-OPTION(BUILD_TESTS "Build tests" OFF)
+OPTION (BUILD_BENCHMARK "Build benchmark" OFF)
+OPTION (BUILD_TESTS "Build tests" OFF)
+OPTION (WITH_OPENSSL "Use OpenSSL for TLS connections" OFF)
PROJECT (CLICKHOUSE-CLIENT)
- USE_CXX11()
+ USE_CXX17 ()
+ USE_OPENSSL ()
- IF ("${CMAKE_BUILD_TYPE}" STREQUAL "")
- set(CMAKE_BUILD_TYPE "Debug")
- ENDIF()
+ IF (NOT CMAKE_BUILD_TYPE)
+ SET (CMAKE_BUILD_TYPE "RelWithDebInfo")
+ ENDIF ()
IF (UNIX)
- IF (APPLE)
- SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -Wall -Wextra -Werror")
- ELSE ()
- SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -pthread -Wall -Wextra -Werror")
+ IF (NOT APPLE)
+ SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
ENDIF ()
SET (CMAKE_EXE_LINKER_FLAGS, "${CMAKE_EXE_LINKER_FLAGS} -lpthread")
+ # -Wpedantic makes int128 support somewhat harder and less performant (by not allowing builtin __int128)
+ SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Werror -Wpedantic-")
ENDIF ()
- INCLUDE_DIRECTORIES(.)
- INCLUDE_DIRECTORIES(contrib)
+ INCLUDE_DIRECTORIES (.)
+ INCLUDE_DIRECTORIES (contrib)
SUBDIRS (
clickhouse
+ contrib/absl
contrib/cityhash
contrib/lz4
- contrib/bigint
)
IF (BUILD_BENCHMARK)
- SUBDIRS(bench)
+ SUBDIRS (bench)
ENDIF (BUILD_BENCHMARK)
IF (BUILD_TESTS)
- SUBDIRS(
+ INCLUDE_DIRECTORIES (contrib/gtest/include contrib/gtest)
+ SUBDIRS (
contrib/gtest
tests/simple
ut
)
- ENDIF (BUILD_TESTS)
+ ENDIF (BUILD_TESTS)
diff --git a/src/vendor/clickhouse-cpp/LICENSE b/src/vendor/clickhouse-cpp/LICENSE
index 5e59a61..f3d0343 100644
--- a/src/vendor/clickhouse-cpp/LICENSE
+++ b/src/vendor/clickhouse-cpp/LICENSE
@@ -1,4 +1,4 @@
-Copyright 2018-2019 Yandex LLC
+Copyright 2018-2020 Yandex LLC
Copyright 2017 Pavel Artemkin
Apache License
@@ -189,7 +189,7 @@ Copyright 2017 Pavel Artemkin
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2018-2019 Yandex LLC
+ Copyright 2018-2020 Yandex LLC
Copyright 2017 Pavel Artemkin
diff --git a/src/vendor/clickhouse-cpp/README.md b/src/vendor/clickhouse-cpp/README.md
index 337c968..c1d82aa 100644
--- a/src/vendor/clickhouse-cpp/README.md
+++ b/src/vendor/clickhouse-cpp/README.md
@@ -1,13 +1,14 @@
-ClickHouse C++ client [](https://travis-ci.org/ClickHouse/clickhouse-cpp)
+ClickHouse C++ client [](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/linux.yml) [](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/macos.yml) [](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/windows_msvc.yml) [](https://github.com/ClickHouse/clickhouse-cpp/actions/workflows/windows_mingw.yml)
=====
-C++ client for [Yandex ClickHouse](https://clickhouse.yandex/)
+C++ client for [ClickHouse](https://clickhouse.com/).
## Supported data types
* Array(T)
* Date
-* DateTime
+* DateTime, DateTime64
+* DateTime([timezone]), DateTime64(N, [timezone])
* Decimal32, Decimal64, Decimal128
* Enum8, Enum16
* FixedString(N)
@@ -15,8 +16,11 @@ C++ client for [Yandex ClickHouse](https://clickhouse.yandex/)
* IPv4, IPv6
* Nullable(T)
* String
+* LowCardinality(String) or LowCardinality(FixedString(N))
* Tuple
* UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
+* Int128
+* UUID
## Building
@@ -71,3 +75,7 @@ client.Select("SELECT id, name FROM test.numbers", [] (const Block& block)
/// Delete table.
client.Execute("DROP TABLE test.numbers");
```
+Please note that `Client` instance is NOT thread-safe. I.e. you must create a separate `Client` for each thread or utilize some synchronization techniques.
+
+
+source code taken from: https://github.com/ClickHouse/clickhouse-cpp/commit/87fc1186063b82b8ddf37b1c70c485bdd7504f8d
diff --git a/src/vendor/clickhouse-cpp/bench/bench.cpp b/src/vendor/clickhouse-cpp/bench/bench.cpp
index 9ca8ff7..4078e3f 100644
--- a/src/vendor/clickhouse-cpp/bench/bench.cpp
+++ b/src/vendor/clickhouse-cpp/bench/bench.cpp
@@ -5,7 +5,11 @@
namespace clickhouse {
Client g_client(ClientOptions()
- .SetHost("localhost")
+ .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost"))
+ .SetPort( std::stoi(getEnvOrDefault("CLICKHOUSE_PORT", "9000")))
+ .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default"))
+ .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", ""))
+ .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default"))
.SetPingBeforeQuery(false));
static void SelectNumber(benchmark::State& state) {
diff --git a/src/vendor/clickhouse-cpp/clickhouse/CMakeLists.txt b/src/vendor/clickhouse-cpp/clickhouse/CMakeLists.txt
index 618e7b6..e2afb3a 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/CMakeLists.txt
+++ b/src/vendor/clickhouse-cpp/clickhouse/CMakeLists.txt
@@ -1,10 +1,10 @@
SET ( clickhouse-cpp-lib-src
- base/coded.cpp
base/compressed.cpp
base/input.cpp
base/output.cpp
base/platform.cpp
base/socket.cpp
+ base/wire_format.cpp
columns/array.cpp
columns/date.cpp
@@ -13,12 +13,16 @@ SET ( clickhouse-cpp-lib-src
columns/factory.cpp
columns/ip4.cpp
columns/ip6.cpp
+ columns/lowcardinality.cpp
+ columns/lowcardinalityadaptor.h
columns/nullable.cpp
columns/numeric.cpp
columns/string.cpp
columns/tuple.cpp
columns/uuid.cpp
+ columns/itemview.cpp
+
types/type_parser.cpp
types/types.cpp
@@ -27,32 +31,53 @@ SET ( clickhouse-cpp-lib-src
query.cpp
)
+IF (WITH_OPENSSL)
+ LIST(APPEND clickhouse-cpp-lib-src base/sslsocket.cpp)
+ENDIF ()
+
ADD_LIBRARY (clickhouse-cpp-lib SHARED ${clickhouse-cpp-lib-src})
SET_TARGET_PROPERTIES(clickhouse-cpp-lib PROPERTIES LINKER_LANGUAGE CXX)
TARGET_LINK_LIBRARIES (clickhouse-cpp-lib
+ absl-lib
cityhash-lib
lz4-lib
- bigint-lib
)
ADD_LIBRARY (clickhouse-cpp-lib-static STATIC ${clickhouse-cpp-lib-src})
TARGET_LINK_LIBRARIES (clickhouse-cpp-lib-static
+ absl-lib
cityhash-lib
lz4-lib
- bigint-lib
)
IF (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
- set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} --rtlib=compiler-rt")
- TARGET_LINK_LIBRARIES (clickhouse-cpp-lib gcc_s)
- TARGET_LINK_LIBRARIES (clickhouse-cpp-lib-static gcc_s)
+ INCLUDE (CheckCXXSourceCompiles)
+
+ CHECK_CXX_SOURCE_COMPILES("#include \nint main() { return __GLIBCXX__ != 0; }"
+ BUILDING_WITH_LIB_STDCXX)
+
+ IF (BUILDING_WITH_LIB_STDCXX)
+ # there is a problem with __builtin_mul_overflow call at link time
+ # the error looks like: ... undefined reference to `__muloti4' ...
+ # caused by clang bug https://bugs.llvm.org/show_bug.cgi?id=16404
+ # explicit linking to compiler-rt allows to workaround the problem
+ SET (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} --rtlib=compiler-rt")
+
+ # some workaround for linking issues on linux:
+ # /usr/bin/ld: CMakeFiles/simple-test.dir/main.cpp.o: undefined reference to symbol '_Unwind_Resume@@GCC_3.0'
+ # /usr/bin/ld: /lib/x86_64-linux-gnu/libgcc_s.so.1: error adding symbols: DSO missing from command line
+ # FIXME: that workaround breaks clang build on mingw
+ TARGET_LINK_LIBRARIES (clickhouse-cpp-lib gcc_s)
+ TARGET_LINK_LIBRARIES (clickhouse-cpp-lib-static gcc_s)
+ ENDIF ()
ENDIF ()
-INSTALL(TARGETS clickhouse-cpp-lib clickhouse-cpp-lib-static
+INSTALL (TARGETS clickhouse-cpp-lib clickhouse-cpp-lib-static
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
)
+
# general
INSTALL(FILES block.h DESTINATION include/clickhouse/)
INSTALL(FILES client.h DESTINATION include/clickhouse/)
@@ -63,7 +88,6 @@ INSTALL(FILES query.h DESTINATION include/clickhouse/)
# base
INSTALL(FILES base/buffer.h DESTINATION include/clickhouse/base/)
-INSTALL(FILES base/coded.h DESTINATION include/clickhouse/base/)
INSTALL(FILES base/compressed.h DESTINATION include/clickhouse/base/)
INSTALL(FILES base/input.h DESTINATION include/clickhouse/base/)
INSTALL(FILES base/output.h DESTINATION include/clickhouse/base/)
@@ -83,6 +107,8 @@ INSTALL(FILES columns/enum.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/factory.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/ip4.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/ip6.h DESTINATION include/clickhouse/columns/)
+INSTALL(FILES columns/itemview.h DESTINATION include/clickhouse/columns/)
+INSTALL(FILES columns/lowcardinality.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/nullable.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/numeric.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/string.h DESTINATION include/clickhouse/columns/)
@@ -92,4 +118,14 @@ INSTALL(FILES columns/uuid.h DESTINATION include/clickhouse/columns/)
# types
INSTALL(FILES types/type_parser.h DESTINATION include/clickhouse/types/)
-INSTALL(FILES types/types.h DESTINATION include/clickhouse/types/)
\ No newline at end of file
+INSTALL(FILES types/types.h DESTINATION include/clickhouse/types/)
+
+IF (WITH_OPENSSL)
+ TARGET_LINK_LIBRARIES (clickhouse-cpp-lib OpenSSL::SSL)
+ TARGET_LINK_LIBRARIES (clickhouse-cpp-lib-static OpenSSL::SSL)
+ENDIF ()
+
+IF (WIN32 OR MINGW)
+ TARGET_LINK_LIBRARIES (clickhouse-cpp-lib wsock32 ws2_32)
+ TARGET_LINK_LIBRARIES (clickhouse-cpp-lib-static wsock32 ws2_32)
+ENDIF ()
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/coded.cpp b/src/vendor/clickhouse-cpp/clickhouse/base/coded.cpp
deleted file mode 100644
index fbd29a2..0000000
--- a/src/vendor/clickhouse-cpp/clickhouse/base/coded.cpp
+++ /dev/null
@@ -1,100 +0,0 @@
-#include "coded.h"
-
-#include
-
-namespace clickhouse {
-
-static const int MAX_VARINT_BYTES = 10;
-
-CodedInputStream::CodedInputStream(ZeroCopyInput* input)
- : input_(input)
-{
-}
-
-bool CodedInputStream::ReadRaw(void* buffer, size_t size) {
- uint8_t* p = static_cast(buffer);
-
- while (size > 0) {
- const void* ptr;
- size_t len = input_->Next(&ptr, size);
-
- memcpy(p, ptr, len);
-
- p += len;
- size -= len;
- }
-
- return true;
-}
-
-bool CodedInputStream::Skip(size_t count) {
- while (count > 0) {
- const void* ptr;
- size_t len = input_->Next(&ptr, count);
-
- if (len == 0) {
- return false;
- }
-
- count -= len;
- }
-
- return true;
-}
-
-bool CodedInputStream::ReadVarint64(uint64_t* value) {
- *value = 0;
-
- for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
- uint8_t byte;
-
- if (!input_->ReadByte(&byte)) {
- return false;
- } else {
- *value |= uint64_t(byte & 0x7F) << (7 * i);
-
- if (!(byte & 0x80)) {
- return true;
- }
- }
- }
-
- // TODO skip invalid
- return false;
-}
-
-
-CodedOutputStream::CodedOutputStream(ZeroCopyOutput* output)
- : output_(output)
-{
-}
-
-void CodedOutputStream::Flush() {
- output_->Flush();
-}
-
-void CodedOutputStream::WriteRaw(const void* buffer, int size) {
- output_->Write(buffer, size);
-}
-
-void CodedOutputStream::WriteVarint64(uint64_t value) {
- uint8_t bytes[MAX_VARINT_BYTES];
- int size = 0;
-
- for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
- uint8_t byte = value & 0x7F;
- if (value > 0x7F)
- byte |= 0x80;
-
- bytes[size++] = byte;
-
- value >>= 7;
- if (!value) {
- break;
- }
- }
-
- WriteRaw(bytes, size);
-}
-
-}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/coded.h b/src/vendor/clickhouse-cpp/clickhouse/base/coded.h
deleted file mode 100644
index a171ac5..0000000
--- a/src/vendor/clickhouse-cpp/clickhouse/base/coded.h
+++ /dev/null
@@ -1,65 +0,0 @@
-#pragma once
-
-#include "input.h"
-#include "output.h"
-
-#include
-
-namespace clickhouse {
-
-/**
- * Class which reads and decodes binary data which is composed of varint-
- * encoded integers and fixed-width pieces.
- */
-class CodedInputStream {
-public:
- /// Create a CodedInputStream that reads from the given ZeroCopyInput.
- explicit CodedInputStream(ZeroCopyInput* input);
-
- // Read an unsigned integer with Varint encoding, truncating to 32 bits.
- // Reading a 32-bit value is equivalent to reading a 64-bit one and casting
- // it to uint32, but may be more efficient.
- bool ReadVarint32(uint32_t* value);
-
- // Read an unsigned integer with Varint encoding.
- bool ReadVarint64(uint64_t* value);
-
- // Read raw bytes, copying them into the given buffer.
- bool ReadRaw(void* buffer, size_t size);
-
- // Like ReadRaw, but reads into a string.
- //
- // Implementation Note: ReadString() grows the string gradually as it
- // reads in the data, rather than allocating the entire requested size
- // upfront. This prevents denial-of-service attacks in which a client
- // could claim that a string is going to be MAX_INT bytes long in order to
- // crash the server because it can't allocate this much space at once.
- bool ReadString(std::string* buffer, int size);
-
- // Skips a number of bytes. Returns false if an underlying read error
- // occurs.
- bool Skip(size_t count);
-
-private:
- ZeroCopyInput* input_;
-};
-
-
-class CodedOutputStream {
-public:
- /// Create a CodedInputStream that writes to the given ZeroCopyOutput.
- explicit CodedOutputStream(ZeroCopyOutput* output);
-
- void Flush();
-
- // Write raw bytes, copying them from the given buffer.
- void WriteRaw(const void* buffer, int size);
-
- /// Write an unsigned integer with Varint encoding.
- void WriteVarint64(const uint64_t value);
-
-private:
- ZeroCopyOutput* output_;
-};
-
-}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/compressed.cpp b/src/vendor/clickhouse-cpp/clickhouse/base/compressed.cpp
index 429e011..baf0ffe 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/compressed.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/compressed.cpp
@@ -1,16 +1,25 @@
#include "compressed.h"
#include "wire_format.h"
+#include "output.h"
+#include "../exceptions.h"
#include
#include
#include
#include
-#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
+namespace {
+constexpr size_t HEADER_SIZE = 9;
+// see DB::CompressionMethodByte::LZ4 from src/Compression/CompressionInfo.h of ClickHouse project
+constexpr uint8_t COMPRESSION_METHOD = 0x82;
+// Documentation says that compression is faster when output buffer is larger than LZ4_compressBound estimation.
+constexpr size_t EXTRA_COMPRESS_BUFFER_SIZE = 4096;
+constexpr size_t DBMS_MAX_COMPRESSED_SIZE = 0x40000000ULL; // 1GB
+}
namespace clickhouse {
-CompressedInput::CompressedInput(CodedInputStream* input)
+CompressedInput::CompressedInput(InputStream* input)
: input_(input)
{
}
@@ -22,7 +31,7 @@ CompressedInput::~CompressedInput() {
#else
if (!std::uncaught_exceptions()) {
#endif
- throw std::runtime_error("some data was not readed");
+ throw LZ4Error("some data was not read");
}
}
}
@@ -43,26 +52,25 @@ bool CompressedInput::Decompress() {
uint32_t original = 0;
uint8_t method = 0;
- if (!WireFormat::ReadFixed(input_, &hash)) {
+ if (!WireFormat::ReadFixed(*input_, &hash)) {
return false;
}
- if (!WireFormat::ReadFixed(input_, &method)) {
+ if (!WireFormat::ReadFixed(*input_, &method)) {
return false;
}
- if (method != 0x82) {
- throw std::runtime_error("unsupported compression method " +
- std::to_string(int(method)));
+ if (method != COMPRESSION_METHOD) {
+ throw LZ4Error("unsupported compression method " + std::to_string(int(method)));
} else {
- if (!WireFormat::ReadFixed(input_, &compressed)) {
+ if (!WireFormat::ReadFixed(*input_, &compressed)) {
return false;
}
- if (!WireFormat::ReadFixed(input_, &original)) {
+ if (!WireFormat::ReadFixed(*input_, &original)) {
return false;
}
if (compressed > DBMS_MAX_COMPRESSED_SIZE) {
- throw std::runtime_error("compressed data too big");
+ throw LZ4Error("compressed data too big");
}
Buffer tmp(compressed);
@@ -73,20 +81,21 @@ bool CompressedInput::Decompress() {
out.Write(&method, sizeof(method));
out.Write(&compressed, sizeof(compressed));
out.Write(&original, sizeof(original));
+ out.Flush();
}
- if (!WireFormat::ReadBytes(input_, tmp.data() + 9, compressed - 9)) {
+ if (!WireFormat::ReadBytes(*input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
return false;
} else {
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
- throw std::runtime_error("data was corrupted");
+ throw LZ4Error("data was corrupted");
}
}
data_ = Buffer(original);
- if (LZ4_decompress_fast((const char*)tmp.data() + 9, (char*)data_.data(), original) < 0) {
- throw std::runtime_error("can't decompress data");
+ if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), compressed - HEADER_SIZE, original) < 0) {
+ throw LZ4Error("can't decompress data");
} else {
mem_.Reset(data_.data(), original);
}
@@ -95,4 +104,71 @@ bool CompressedInput::Decompress() {
return true;
}
+
+CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
+ : destination_(destination)
+ , max_compressed_chunk_size_(max_compressed_chunk_size)
+{
+ PreallocateCompressBuffer(max_compressed_chunk_size);
+}
+
+CompressedOutput::~CompressedOutput() { }
+
+size_t CompressedOutput::DoWrite(const void* data, size_t len) {
+ const size_t original_len = len;
+ // what if len > max_compressed_chunk_size_ ?
+ const size_t max_chunk_size = max_compressed_chunk_size_ > 0 ? max_compressed_chunk_size_ : len;
+ if (max_chunk_size > max_compressed_chunk_size_) {
+ PreallocateCompressBuffer(len);
+ }
+
+ while (len > 0) {
+ auto to_compress = std::min(len, max_chunk_size);
+ Compress(data, to_compress);
+
+ len -= to_compress;
+ data = reinterpret_cast(data) + to_compress;
+ }
+
+ return original_len - len;
+}
+
+void CompressedOutput::DoFlush() {
+ destination_->Flush();
+}
+
+void CompressedOutput::Compress(const void * data, size_t len) {
+ const auto compressed_size = LZ4_compress_default(
+ (const char*)data,
+ (char*)compressed_buffer_.data() + HEADER_SIZE,
+ len,
+ static_cast(compressed_buffer_.size() - HEADER_SIZE));
+ if (compressed_size <= 0)
+ throw LZ4Error("Failed to compress chunk of " + std::to_string(len) + " bytes, "
+ "LZ4 error: " + std::to_string(compressed_size));
+
+ {
+ auto header = compressed_buffer_.data();
+ WriteUnaligned(header, COMPRESSION_METHOD);
+ // Compressed data size with header
+ WriteUnaligned(header + 1, static_cast(compressed_size + HEADER_SIZE));
+ // Original data size
+ WriteUnaligned(header + 5, static_cast(len));
+ }
+
+ WireFormat::WriteFixed(*destination_, CityHash128(
+ (const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
+ WireFormat::WriteBytes(*destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);
+
+ destination_->Flush();
+}
+
+void CompressedOutput::PreallocateCompressBuffer(size_t input_size) {
+ const auto estimated_compressed_buffer_size = LZ4_compressBound(static_cast(input_size));
+ if (estimated_compressed_buffer_size <= 0)
+ throw LZ4Error("Failed to estimate compressed buffer size, LZ4 error: " + std::to_string(estimated_compressed_buffer_size));
+
+ compressed_buffer_.resize(estimated_compressed_buffer_size + HEADER_SIZE + EXTRA_COMPRESS_BUFFER_SIZE);
+}
+
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/compressed.h b/src/vendor/clickhouse-cpp/clickhouse/base/compressed.h
index 8c1b461..0b40d0b 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/compressed.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/compressed.h
@@ -1,12 +1,14 @@
#pragma once
-#include "coded.h"
+#include "input.h"
+#include "output.h"
+#include "buffer.h"
namespace clickhouse {
class CompressedInput : public ZeroCopyInput {
public:
- CompressedInput(CodedInputStream* input);
+ CompressedInput(InputStream* input);
~CompressedInput();
protected:
@@ -15,10 +17,29 @@ class CompressedInput : public ZeroCopyInput {
bool Decompress();
private:
- CodedInputStream* const input_;
+ InputStream* const input_;
Buffer data_;
ArrayInput mem_;
};
+class CompressedOutput : public OutputStream {
+public:
+ CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0);
+ ~CompressedOutput();
+
+protected:
+ size_t DoWrite(const void* data, size_t len) override;
+ void DoFlush() override;
+
+private:
+ void Compress(const void * data, size_t len);
+ void PreallocateCompressBuffer(size_t input_size);
+
+private:
+ OutputStream * destination_;
+ const size_t max_compressed_chunk_size_;
+ Buffer compressed_buffer_;
+};
+
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/input.cpp b/src/vendor/clickhouse-cpp/clickhouse/base/input.cpp
index f7b7ff6..e704fe5 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/input.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/input.cpp
@@ -5,6 +5,21 @@
namespace clickhouse {
+bool ZeroCopyInput::Skip(size_t bytes) {
+ while (bytes > 0) {
+ const void* ptr;
+ size_t len = Next(&ptr, bytes);
+
+ if (len == 0) {
+ return false;
+ }
+
+ bytes -= len;
+ }
+
+ return true;
+}
+
size_t ZeroCopyInput::DoRead(void* buf, size_t len) {
const void* ptr;
size_t result = DoNext(&ptr, len);
@@ -41,8 +56,8 @@ size_t ArrayInput::DoNext(const void** ptr, size_t len) {
}
-BufferedInput::BufferedInput(InputStream* slave, size_t buflen)
- : slave_(slave)
+BufferedInput::BufferedInput(std::unique_ptr source, size_t buflen)
+ : source_(std::move(source))
, array_input_(nullptr, 0)
, buffer_(buflen)
{
@@ -57,7 +72,7 @@ void BufferedInput::Reset() {
size_t BufferedInput::DoNext(const void** ptr, size_t len) {
if (array_input_.Exhausted()) {
array_input_.Reset(
- buffer_.data(), slave_->Read(buffer_.data(), buffer_.size())
+ buffer_.data(), source_->Read(buffer_.data(), buffer_.size())
);
}
@@ -67,11 +82,11 @@ size_t BufferedInput::DoNext(const void** ptr, size_t len) {
size_t BufferedInput::DoRead(void* buf, size_t len) {
if (array_input_.Exhausted()) {
if (len > buffer_.size() / 2) {
- return slave_->Read(buf, len);
+ return source_->Read(buf, len);
}
array_input_.Reset(
- buffer_.data(), slave_->Read(buffer_.data(), buffer_.size())
+ buffer_.data(), source_->Read(buffer_.data(), buffer_.size())
);
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/input.h b/src/vendor/clickhouse-cpp/clickhouse/base/input.h
index 052fab8..a8885b3 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/input.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/input.h
@@ -3,6 +3,7 @@
#include
#include
#include
+#include
namespace clickhouse {
@@ -21,6 +22,9 @@ class InputStream {
return DoRead(buf, len);
}
+ // Skips a number of bytes. Returns false if an underlying read error occurs.
+ virtual bool Skip(size_t bytes) = 0;
+
protected:
virtual size_t DoRead(void* buf, size_t len) = 0;
};
@@ -32,6 +36,8 @@ class ZeroCopyInput : public InputStream {
return DoNext(buf, len);
}
+ bool Skip(size_t bytes) override;
+
protected:
virtual size_t DoNext(const void** ptr, size_t len) = 0;
@@ -79,7 +85,7 @@ class ArrayInput : public ZeroCopyInput {
class BufferedInput : public ZeroCopyInput {
public:
- BufferedInput(InputStream* slave, size_t buflen = 8192);
+ BufferedInput(std::unique_ptr source, size_t buflen = 8192);
~BufferedInput() override;
void Reset();
@@ -89,7 +95,7 @@ class BufferedInput : public ZeroCopyInput {
size_t DoNext(const void** ptr, size_t len) override;
private:
- InputStream* const slave_;
+ std::unique_ptr const source_;
ArrayInput array_input_;
std::vector buffer_;
};
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/output.cpp b/src/vendor/clickhouse-cpp/clickhouse/base/output.cpp
index dcd25ef..86b9fbd 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/output.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/output.cpp
@@ -6,7 +6,8 @@
namespace clickhouse {
-void ZeroCopyOutput::DoWrite(const void* data, size_t len) {
+size_t ZeroCopyOutput::DoWrite(const void* data, size_t len) {
+ const size_t original_len = len;
while (len > 0) {
void* ptr;
size_t result = DoNext(&ptr, len);
@@ -19,12 +20,15 @@ void ZeroCopyOutput::DoWrite(const void* data, size_t len) {
break;
}
}
+
+ return original_len - len;
}
ArrayOutput::ArrayOutput(void* buf, size_t len)
: buf_(static_cast(buf))
, end_(buf_ + len)
+ , buffer_size_(len)
{
}
@@ -62,26 +66,14 @@ size_t BufferOutput::DoNext(void** data, size_t len) {
}
-BufferedOutput::BufferedOutput(OutputStream* slave, size_t buflen)
- : slave_(slave)
+BufferedOutput::BufferedOutput(std::unique_ptr destination, size_t buflen)
+ : destination_(std::move(destination))
, buffer_(buflen)
, array_output_(buffer_.data(), buflen)
{
}
-BufferedOutput::~BufferedOutput() {
- try
- {
- Flush();
- }
- catch (...)
- {
- // That means we've failed to flush some data e.g. to the socket,
- // but there is nothing we can do at this point (can't bring the socket back),
- // and throwing in destructor is really a bad idea.
- // The best we can do is to log the error and ignore it, but currently there is no logging subsystem.
- }
-}
+BufferedOutput::~BufferedOutput() { }
void BufferedOutput::Reset() {
array_output_.Reset(buffer_.data(), buffer_.size());
@@ -89,8 +81,8 @@ void BufferedOutput::Reset() {
void BufferedOutput::DoFlush() {
if (array_output_.Data() != buffer_.data()) {
- slave_->Write(buffer_.data(), array_output_.Data() - buffer_.data());
- slave_->Flush();
+ destination_->Write(buffer_.data(), array_output_.Data() - buffer_.data());
+ destination_->Flush();
array_output_.Reset(buffer_.data(), buffer_.size());
}
@@ -105,17 +97,16 @@ size_t BufferedOutput::DoNext(void** data, size_t len) {
}
-void BufferedOutput::DoWrite(const void* data, size_t len) {
+size_t BufferedOutput::DoWrite(const void* data, size_t len) {
if (array_output_.Avail() < len) {
Flush();
if (len > buffer_.size() / 2) {
- slave_->Write(data, len);
- return;
+ return destination_->Write(data, len);
}
}
- array_output_.Write(data, len);
+ return array_output_.Write(data, len);
}
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/output.h b/src/vendor/clickhouse-cpp/clickhouse/base/output.h
index e53aadf..bb804ce 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/output.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/output.h
@@ -6,6 +6,7 @@
#include
#include
#include
+#include
namespace clickhouse {
@@ -18,14 +19,14 @@ class OutputStream {
DoFlush();
}
- inline void Write(const void* data, size_t len) {
- DoWrite(data, len);
+ inline size_t Write(const void* data, size_t len) {
+ return DoWrite(data, len);
}
protected:
virtual void DoFlush() { }
- virtual void DoWrite(const void* data, size_t len) = 0;
+ virtual size_t DoWrite(const void* data, size_t len) = 0;
};
@@ -41,7 +42,7 @@ class ZeroCopyOutput : public OutputStream {
// be written to the output.
virtual size_t DoNext(void** data, size_t len) = 0;
- void DoWrite(const void* data, size_t len) override;
+ size_t DoWrite(const void* data, size_t len) override;
};
@@ -72,6 +73,12 @@ class ArrayOutput : public ZeroCopyOutput {
inline void Reset(void* buf, size_t len) noexcept {
buf_ = static_cast(buf);
end_ = buf_ + len;
+ buffer_size_ = len;
+ }
+
+ /// Number of bytes written to the buffer.
+ inline size_t Size() const noexcept {
+ return buffer_size_ - Avail();
}
protected:
@@ -80,16 +87,19 @@ class ArrayOutput : public ZeroCopyOutput {
private:
uint8_t* buf_;
uint8_t* end_;
+ size_t buffer_size_;
};
/**
- * A ZeroCopyOutput stream backed by an vector of bytes.
+ * A ZeroCopyOutput stream backed by a vector.
+ *
+ * Doesn't Flush() in destructor, client must ensure to do it manually at some point.
*/
class BufferOutput : public ZeroCopyOutput {
public:
BufferOutput(Buffer* buf);
- ~BufferOutput();
+ ~BufferOutput() override;
protected:
size_t DoNext(void** data, size_t len) override;
@@ -99,10 +109,16 @@ class BufferOutput : public ZeroCopyOutput {
size_t pos_;
};
-
+/** BufferedOutput writes data to internal buffer first.
+ *
+ * Any data goes to underlying stream only if internal buffer is full
+ * or when client invokes Flush() on this.
+ *
+ * Doesn't Flush() in destructor, client must ensure to do it manually at some point.
+ */
class BufferedOutput : public ZeroCopyOutput {
public:
- BufferedOutput(OutputStream* slave, size_t buflen = 8192);
+ explicit BufferedOutput(std::unique_ptr destination, size_t buflen = 8192);
~BufferedOutput() override;
void Reset();
@@ -110,10 +126,10 @@ class BufferedOutput : public ZeroCopyOutput {
protected:
void DoFlush() override;
size_t DoNext(void** data, size_t len) override;
- void DoWrite(const void* data, size_t len) override;
+ size_t DoWrite(const void* data, size_t len) override;
private:
- OutputStream* const slave_;
+ std::unique_ptr const destination_;
Buffer buffer_;
ArrayOutput array_output_;
};
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/platform.h b/src/vendor/clickhouse-cpp/clickhouse/base/platform.h
index 9f53da1..f6d896b 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/platform.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/platform.h
@@ -13,6 +13,11 @@
#if defined(_win32_) || defined(_win64_)
# define _win_
+# if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x0600)
+# undef _WIN32_WINNT
+# define _WIN32_WINNT 0x0600 // The WSAPoll function is defined on Windows Vista and later.
+# endif
+# define WIN32_LEAN_AND_MEAN 1 // don't include too much header automatically
#endif
#if defined(_linux_) || defined (_darwin_)
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/socket.cpp b/src/vendor/clickhouse-cpp/clickhouse/base/socket.cpp
index 86df993..a11cd2f 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/socket.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/socket.cpp
@@ -1,26 +1,48 @@
-#include
-
#include "socket.h"
#include "singleton.h"
+#include "../client.h"
+
#include
#include
#include
#include
#include
-
+#include
#if !defined(_win_)
+# include
# include
# include
# include
# include
# include
-#else
-# include
-# include
#endif
namespace clickhouse {
+
+#if defined(_win_)
+char const* windowsErrorCategory::name() const noexcept {
+ return "WindowsSocketError";
+}
+
+std::string windowsErrorCategory::message(int c) const {
+ char error[UINT8_MAX];
+ auto len = FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nullptr, static_cast(c), 0, error, sizeof(error), nullptr);
+ if (len == 0) {
+ return "unknown";
+ }
+ while (len && (error[len - 1] == '\r' || error[len - 1] == '\n')) {
+ --len;
+ }
+ return std::string(error, len);
+}
+
+windowsErrorCategory const& windowsErrorCategory::category() {
+ static windowsErrorCategory c;
+ return c;
+}
+#endif
+
namespace {
class LocalNames : public std::unordered_set {
@@ -39,8 +61,24 @@ class LocalNames : public std::unordered_set {
}
};
+inline int getSocketErrorCode() {
+#if defined(_win_)
+ return WSAGetLastError();
+#else
+ return errno;
+#endif
+}
+
+const std::error_category& getErrorCategory() noexcept {
+#if defined(_win_)
+ return windowsErrorCategory::category();
+#else
+ return std::system_category();
+#endif
+}
+
void SetNonBlock(SOCKET fd, bool value) {
-#if defined(_unix_)
+#if defined(_unix_) || defined(__CYGWIN__)
int flags;
int ret;
#if defined(O_NONBLOCK)
@@ -57,8 +95,7 @@ void SetNonBlock(SOCKET fd, bool value) {
return ioctl(fd, FIOBIO, &flags);
#endif
if (ret == -1) {
- throw std::system_error(
- errno, std::system_category(), "fail to set nonblocking mode");
+ throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to set nonblocking mode");
}
#elif defined(_win_)
unsigned long inbuf = value;
@@ -70,16 +107,74 @@ void SetNonBlock(SOCKET fd, bool value) {
}
if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
- throw std::system_error(
- errno, std::system_category(), "fail to set nonblocking mode");
+ throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to set nonblocking mode");
}
#endif
}
+ssize_t Poll(struct pollfd* fds, int nfds, int timeout) noexcept {
+#if defined(_win_)
+ return WSAPoll(fds, nfds, timeout);
+#else
+ return poll(fds, nfds, timeout);
+#endif
+}
+
+SOCKET SocketConnect(const NetworkAddress& addr) {
+ int last_err = 0;
+ for (auto res = addr.Info(); res != nullptr; res = res->ai_next) {
+ SOCKET s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
+
+ if (s == -1) {
+ continue;
+ }
+
+ SetNonBlock(s, true);
+
+ if (connect(s, res->ai_addr, (int)res->ai_addrlen) != 0) {
+ int err = getSocketErrorCode();
+ if (
+ err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK
+#if defined(_win_)
+ || err == WSAEWOULDBLOCK || err == WSAEINPROGRESS
+#endif
+ ) {
+ pollfd fd;
+ fd.fd = s;
+ fd.events = POLLOUT;
+ fd.revents = 0;
+ ssize_t rval = Poll(&fd, 1, 5000);
+
+ if (rval == -1) {
+ throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to connect");
+ }
+ if (rval > 0) {
+ socklen_t len = sizeof(err);
+ getsockopt(s, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
+
+ if (!err) {
+ SetNonBlock(s, false);
+ return s;
+ }
+ last_err = err;
+ }
+ }
+ } else {
+ SetNonBlock(s, false);
+ return s;
+ }
+ }
+ if (last_err > 0) {
+ throw std::system_error(last_err, getErrorCategory(), "fail to connect");
+ }
+ throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to connect");
+}
+
} // namespace
NetworkAddress::NetworkAddress(const std::string& host, const std::string& port)
- : info_(nullptr)
+ : host_(host)
+ , info_(nullptr)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
@@ -102,7 +197,7 @@ NetworkAddress::NetworkAddress(const std::string& host, const std::string& port)
const int error = getaddrinfo(host.c_str(), port.c_str(), &hints, &info_);
if (error) {
- throw std::system_error(errno, std::system_category());
+ throw std::system_error(getSocketErrorCode(), getErrorCategory());
}
}
@@ -115,29 +210,46 @@ NetworkAddress::~NetworkAddress() {
const struct addrinfo* NetworkAddress::Info() const {
return info_;
}
+const std::string & NetworkAddress::Host() const {
+ return host_;
+}
-SocketHolder::SocketHolder()
- : handle_(-1)
-{
-}
+SocketBase::~SocketBase() = default;
-SocketHolder::SocketHolder(SOCKET s)
- : handle_(s)
-{
+SocketFactory::~SocketFactory() = default;
+
+void SocketFactory::sleepFor(const std::chrono::milliseconds& duration) {
+ std::this_thread::sleep_for(duration);
}
-SocketHolder::SocketHolder(SocketHolder&& other) noexcept
+
+Socket::Socket(const NetworkAddress& addr)
+ : handle_(SocketConnect(addr))
+{}
+
+Socket::Socket(Socket&& other) noexcept
: handle_(other.handle_)
{
other.handle_ = -1;
}
-SocketHolder::~SocketHolder() {
+Socket& Socket::operator=(Socket&& other) noexcept {
+ if (this != &other) {
+ Close();
+
+ handle_ = other.handle_;
+ other.handle_ = -1;
+ }
+
+ return *this;
+}
+
+Socket::~Socket() {
Close();
}
-void SocketHolder::Close() noexcept {
+void Socket::Close() {
if (handle_ != -1) {
#if defined(_win_)
closesocket(handle_);
@@ -148,11 +260,7 @@ void SocketHolder::Close() noexcept {
}
}
-bool SocketHolder::Closed() const noexcept {
- return handle_ == -1;
-}
-
-void SocketHolder::SetTcpKeepAlive(int idle, int intvl, int cnt) noexcept {
+void Socket::SetTcpKeepAlive(int idle, int intvl, int cnt) noexcept {
int val = 1;
#if defined(_unix_)
@@ -172,21 +280,49 @@ void SocketHolder::SetTcpKeepAlive(int idle, int intvl, int cnt) noexcept {
#endif
}
-SocketHolder& SocketHolder::operator = (SocketHolder&& other) noexcept {
- if (this != &other) {
- Close();
+void Socket::SetTcpNoDelay(bool nodelay) noexcept {
+ int val = nodelay;
+#if defined(_unix_)
+ setsockopt(handle_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
+#else
+ setsockopt(handle_, IPPROTO_TCP, TCP_NODELAY, (const char*)&val, sizeof(val));
+#endif
+}
- handle_ = other.handle_;
- other.handle_ = -1;
- }
+std::unique_ptr Socket::makeInputStream() const {
+ return std::make_unique(handle_);
+}
- return *this;
+std::unique_ptr Socket::makeOutputStream() const {
+ return std::make_unique(handle_);
+}
+
+NonSecureSocketFactory::~NonSecureSocketFactory() {}
+
+std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts) {
+ const auto address = NetworkAddress(opts.host, std::to_string(opts.port));
+
+ auto socket = doConnect(address);
+ setSocketOptions(*socket, opts);
+
+ return socket;
}
-SocketHolder::operator SOCKET () const noexcept {
- return handle_;
+std::unique_ptr NonSecureSocketFactory::doConnect(const NetworkAddress& address) {
+ return std::make_unique(address);
}
+void NonSecureSocketFactory::setSocketOptions(Socket &socket, const ClientOptions &opts) {
+ if (opts.tcp_keepalive) {
+ socket.SetTcpKeepAlive(
+ static_cast(opts.tcp_keepalive_idle.count()),
+ static_cast(opts.tcp_keepalive_intvl.count()),
+ static_cast(opts.tcp_keepalive_cnt));
+ }
+ if (opts.tcp_nodelay) {
+ socket.SetTcpNoDelay(opts.tcp_nodelay);
+ }
+}
SocketInput::SocketInput(SOCKET s)
: s_(s)
@@ -203,14 +339,14 @@ size_t SocketInput::DoRead(void* buf, size_t len) {
}
if (ret == 0) {
- throw std::system_error(
- errno, std::system_category(), "closed"
- );
+ throw std::system_error(getSocketErrorCode(), getErrorCategory(), "closed");
}
- throw std::system_error(
- errno, std::system_category(), "can't receive string data"
- );
+ throw std::system_error(getSocketErrorCode(), getErrorCategory(), "can't receive string data");
+}
+
+bool SocketInput::Skip(size_t /*bytes*/) {
+ return false;
}
@@ -221,7 +357,7 @@ SocketOutput::SocketOutput(SOCKET s)
SocketOutput::~SocketOutput() = default;
-void SocketOutput::DoWrite(const void* data, size_t len) {
+size_t SocketOutput::DoWrite(const void* data, size_t len) {
#if defined (_linux_)
static const int flags = MSG_NOSIGNAL;
#else
@@ -229,10 +365,10 @@ void SocketOutput::DoWrite(const void* data, size_t len) {
#endif
if (::send(s_, (const char*)data, (int)len, flags) != (int)len) {
- throw std::system_error(
- errno, std::system_category(), "fail to send data"
- );
+ throw std::system_error(getSocketErrorCode(), getErrorCategory(), "fail to send " + std::to_string(len) + " bytes of data");
}
+
+ return len;
}
@@ -256,82 +392,4 @@ NetrworkInitializer::NetrworkInitializer() {
(void)Singleton();
}
-
-SOCKET SocketConnect(const NetworkAddress& addr) {
- int last_err = 0;
-
- for (auto res = addr.Info(); res != nullptr; res = res->ai_next) {
- int reuse = 1;
- int sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
- setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (const char*) &reuse, sizeof(int));
-
- SOCKET s(sockfd);
-
- if (s == -1) {
- continue;
- }
-
- SetNonBlock(s, true);
- int cret = connect(s, res->ai_addr, (int)res->ai_addrlen);
-
- #if defined(_win_)
- // poll to avoid WSAEWOULDBLOCK error
- for(size_t i = 0; i < 10; i++) {
- if(WSAGetLastError() == 0) {
- cret = 0;
- continue;
- }
-
- std::this_thread::sleep_for(std::chrono::seconds(1));
- }
- #endif
-
-
- if (cret != 0) {
- int err = errno;
-
- if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
- pollfd fd;
- fd.fd = s;
- fd.events = POLLOUT;
- fd.revents = 0;
- ssize_t rval = Poll(&fd, 1, 5000);
-
- if (rval == -1) {
- throw std::system_error(errno, std::system_category(), "fail to connect");
- }
- if (rval > 0) {
- socklen_t len = sizeof(err);
- getsockopt(s, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
-
- if (!err) {
- SetNonBlock(s, false);
- return s;
- }
- last_err = err;
- }
- }
- } else {
- SetNonBlock(s, false);
- return s;
- }
- }
- if (last_err > 0) {
- throw std::system_error(last_err, std::system_category(), "fail to connect");
- }
- throw std::system_error(
- errno, std::system_category(), "fail to connect"
- );
-}
-
-
-ssize_t Poll(struct pollfd* fds, int nfds, int timeout) noexcept {
-#if defined(_win_)
- int rval = WSAPoll(fds, nfds, timeout);
-#else
- return poll(fds, nfds, timeout);
-#endif
- return -1;
-}
-
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/socket.h b/src/vendor/clickhouse-cpp/clickhouse/base/socket.h
index a38a829..e7cacc1 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/socket.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/socket.h
@@ -1,15 +1,14 @@
#pragma once
+#include "platform.h"
#include "input.h"
#include "output.h"
-#include "platform.h"
#include
#include
+#include
#if defined(_win_)
-# pragma comment(lib, "Ws2_32.lib")
-
# include
# include
#else
@@ -23,11 +22,16 @@
# endif
#endif
+#include
+#include
+
struct addrinfo;
namespace clickhouse {
-/**
+struct ClientOptions;
+
+/** Address of a host to establish connection to.
*
*/
class NetworkAddress {
@@ -37,23 +41,54 @@ class NetworkAddress {
~NetworkAddress();
const struct addrinfo* Info() const;
+ const std::string & Host() const;
private:
+ const std::string host_;
struct addrinfo* info_;
};
+#if defined(_win_)
-class SocketHolder {
+class windowsErrorCategory : public std::error_category {
public:
- SocketHolder();
- SocketHolder(SOCKET s);
- SocketHolder(SocketHolder&& other) noexcept;
+ char const* name() const noexcept override final;
+ std::string message(int c) const override final;
- ~SocketHolder();
+ static windowsErrorCategory const& category();
+};
- void Close() noexcept;
+#endif
- bool Closed() const noexcept;
+
+class SocketBase {
+public:
+ virtual ~SocketBase();
+
+ virtual std::unique_ptr makeInputStream() const = 0;
+ virtual std::unique_ptr makeOutputStream() const = 0;
+};
+
+
+class SocketFactory {
+public:
+ virtual ~SocketFactory();
+
+ // TODO: move connection-related options to ConnectionOptions structure.
+
+ virtual std::unique_ptr connect(const ClientOptions& opts) = 0;
+
+ virtual void sleepFor(const std::chrono::milliseconds& duration);
+};
+
+
+class Socket : public SocketBase {
+public:
+ Socket(const NetworkAddress& addr);
+ Socket(Socket&& other) noexcept;
+ Socket& operator=(Socket&& other) noexcept;
+
+ ~Socket() override;
/// @params idle the time (in seconds) the connection needs to remain
/// idle before TCP starts sending keepalive probes.
@@ -62,27 +97,41 @@ class SocketHolder {
/// before dropping the connection.
void SetTcpKeepAlive(int idle, int intvl, int cnt) noexcept;
- SocketHolder& operator = (SocketHolder&& other) noexcept;
+ /// @params nodelay whether to enable TCP_NODELAY
+ void SetTcpNoDelay(bool nodelay) noexcept;
- operator SOCKET () const noexcept;
+ std::unique_ptr makeInputStream() const override;
+ std::unique_ptr makeOutputStream() const override;
-private:
- SocketHolder(const SocketHolder&) = delete;
- SocketHolder& operator = (const SocketHolder&) = delete;
+protected:
+ Socket(const Socket&) = delete;
+ Socket& operator = (const Socket&) = delete;
+ void Close();
SOCKET handle_;
};
-/**
- *
- */
+class NonSecureSocketFactory : public SocketFactory {
+public:
+ ~NonSecureSocketFactory() override;
+
+ std::unique_ptr connect(const ClientOptions& opts) override;
+
+protected:
+ virtual std::unique_ptr doConnect(const NetworkAddress& address);
+
+ void setSocketOptions(Socket& socket, const ClientOptions& opts);
+};
+
+
class SocketInput : public InputStream {
public:
explicit SocketInput(SOCKET s);
~SocketInput();
protected:
+ bool Skip(size_t bytes) override;
size_t DoRead(void* buf, size_t len) override;
private:
@@ -95,7 +144,7 @@ class SocketOutput : public OutputStream {
~SocketOutput();
protected:
- void DoWrite(const void* data, size_t len) override;
+ size_t DoWrite(const void* data, size_t len) override;
private:
SOCKET s_;
@@ -105,9 +154,4 @@ static struct NetrworkInitializer {
NetrworkInitializer();
} gNetrworkInitializer;
-///
-SOCKET SocketConnect(const NetworkAddress& addr);
-
-ssize_t Poll(struct pollfd* fds, int nfds, int timeout) noexcept;
-
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/sslsocket.cpp b/src/vendor/clickhouse-cpp/clickhouse/base/sslsocket.cpp
new file mode 100644
index 0000000..392c22f
--- /dev/null
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/sslsocket.cpp
@@ -0,0 +1,302 @@
+#include "sslsocket.h"
+#include "../client.h"
+#include "../exceptions.h"
+
+#include
+
+#include
+#include
+#include
+#include
+
+
+namespace {
+
+std::string getCertificateInfo(X509* cert)
+{
+ if (!cert)
+ return "No certificate";
+
+ std::unique_ptr mem_bio(BIO_new(BIO_s_mem()), &BIO_free);
+ X509_print(mem_bio.get(), cert);
+
+ char * data = nullptr;
+ auto len = BIO_get_mem_data(mem_bio.get(), &data);
+ if (len < 0)
+ return "Can't get certificate info due to BIO error " + std::to_string(len);
+
+ return std::string(data, len);
+}
+
+void throwSSLError(SSL * ssl, int error, const char * /*location*/, const char * /*statement*/, const std::string prefix = "OpenSSL error: ") {
+ const auto detail_error = ERR_get_error();
+ auto reason = ERR_reason_error_string(detail_error);
+ reason = reason ? reason : "Unknown SSL error";
+
+ std::string reason_str = reason;
+ if (ssl) {
+ // Print certificate only if handshake isn't completed
+ if (auto ssl_session = SSL_get_session(ssl); ssl_session && SSL_get_state(ssl) != TLS_ST_OK)
+ reason_str += "\nServer certificate: " + getCertificateInfo(SSL_SESSION_get0_peer(ssl_session));
+ }
+
+// std::cerr << "!!! SSL error at " << location
+// << "\n\tcaused by " << statement
+// << "\n\t: "<< reason_str << "(" << error << ")"
+// << "\n\t last err: " << ERR_peek_last_error()
+// << std::endl;
+
+ throw clickhouse::OpenSSLError(prefix + std::to_string(error) + " : " + reason_str);
+}
+
+void configureSSL(const clickhouse::SSLParams::ConfigurationType & configuration, SSL * ssl, SSL_CTX * context = nullptr) {
+ std::unique_ptr conf_ctx_holder(SSL_CONF_CTX_new(), SSL_CONF_CTX_free);
+ auto conf_ctx = conf_ctx_holder.get();
+
+ // To make both cmdline and flag file commands start with no prefix.
+ SSL_CONF_CTX_set1_prefix(conf_ctx, "");
+ // Allow all set of client commands, also turn on proper error reporting to reuse throwSSLError().
+ SSL_CONF_CTX_set_flags(conf_ctx, SSL_CONF_FLAG_CMDLINE | SSL_CONF_FLAG_FILE | SSL_CONF_FLAG_CLIENT | SSL_CONF_FLAG_SHOW_ERRORS | SSL_CONF_FLAG_CERTIFICATE );
+ if (ssl)
+ SSL_CONF_CTX_set_ssl(conf_ctx, ssl);
+ else if (context)
+ SSL_CONF_CTX_set_ssl_ctx(conf_ctx, context);
+
+ for (const auto & kv : configuration) {
+ const int err = SSL_CONF_cmd(conf_ctx, kv.first.c_str(), (kv.second ? kv.second->c_str() : nullptr));
+ // From the documentation:
+ // 2 - both key and value used
+ // 1 - only key used
+ // 0 - error during processing
+ // -2 - key not recodnized
+ // -3 - missing value
+ const bool value_present = !!kv.second;
+ if (err == 2 || (err == 1 && !value_present))
+ continue;
+ else if (err == 0)
+ throwSSLError(ssl, SSL_ERROR_NONE, nullptr, nullptr, "Failed to configure OpenSSL with command '" + kv.first + "' ");
+ else if (err == 1 && value_present)
+ throw clickhouse::OpenSSLError("Failed to configure OpenSSL: command '" + kv.first + "' needs no value");
+ else if (err == -2)
+ throw clickhouse::OpenSSLError("Failed to cofigure OpenSSL: unknown command '" + kv.first + "'");
+ else if (err == -3)
+ throw clickhouse::OpenSSLError("Failed to cofigure OpenSSL: command '" + kv.first + "' requires a value");
+ else
+ throw clickhouse::OpenSSLError("Failed to cofigure OpenSSL: command '" + kv.first + "' unknown error: " + std::to_string(err));
+ }
+}
+
+#define STRINGIFY_HELPER(x) #x
+#define STRINGIFY(x) STRINGIFY_HELPER(x)
+#define LOCATION __FILE__ ":" STRINGIFY(__LINE__)
+
+struct SSLInitializer {
+ SSLInitializer() {
+ SSL_library_init();
+ SSLeay_add_ssl_algorithms();
+ SSL_load_error_strings();
+ }
+};
+
+SSL_CTX * prepareSSLContext(const clickhouse::SSLParams & context_params) {
+ static const SSLInitializer ssl_initializer;
+
+ const SSL_METHOD *method = TLS_client_method();
+ std::unique_ptr ctx(SSL_CTX_new(method), &SSL_CTX_free);
+
+ if (!ctx)
+ throw clickhouse::OpenSSLError("Failed to initialize SSL context");
+
+#define HANDLE_SSL_CTX_ERROR(statement) do { \
+ if (const auto ret_code = (statement); !ret_code) \
+ throwSSLError(nullptr, ERR_peek_error(), LOCATION, #statement); \
+} while(false);
+
+ if (context_params.use_default_ca_locations)
+ HANDLE_SSL_CTX_ERROR(SSL_CTX_set_default_verify_paths(ctx.get()));
+ if (!context_params.path_to_ca_directory.empty())
+ HANDLE_SSL_CTX_ERROR(
+ SSL_CTX_load_verify_locations(
+ ctx.get(),
+ nullptr,
+ context_params.path_to_ca_directory.c_str())
+ );
+
+ for (const auto & f : context_params.path_to_ca_files)
+ HANDLE_SSL_CTX_ERROR(SSL_CTX_load_verify_locations(ctx.get(), f.c_str(), nullptr));
+
+ if (context_params.context_options != -1)
+ SSL_CTX_set_options(ctx.get(), context_params.context_options);
+ if (context_params.min_protocol_version != -1)
+ HANDLE_SSL_CTX_ERROR(
+ SSL_CTX_set_min_proto_version(ctx.get(), context_params.min_protocol_version));
+ if (context_params.max_protocol_version != -1)
+ HANDLE_SSL_CTX_ERROR(
+ SSL_CTX_set_max_proto_version(ctx.get(), context_params.max_protocol_version));
+
+ return ctx.release();
+#undef HANDLE_SSL_CTX_ERROR
+}
+
+auto convertConfiguration(const decltype(clickhouse::ClientOptions::SSLOptions::configuration) & configuration)
+{
+ auto result = decltype(clickhouse::SSLParams::configuration){};
+ for (const auto & cv : configuration)
+ result.push_back({cv.command, cv.value});
+
+ return result;
+}
+
+clickhouse::SSLParams GetSSLParams(const clickhouse::ClientOptions& opts) {
+ const auto& ssl_options = *opts.ssl_options;
+ return clickhouse::SSLParams{
+ ssl_options.path_to_ca_files,
+ ssl_options.path_to_ca_directory,
+ ssl_options.use_default_ca_locations,
+ ssl_options.context_options,
+ ssl_options.min_protocol_version,
+ ssl_options.max_protocol_version,
+ ssl_options.use_sni,
+ ssl_options.skip_verification,
+ ssl_options.host_flags,
+ convertConfiguration(ssl_options.configuration)
+ };
+}
+
+}
+
+namespace clickhouse {
+
+SSLContext::SSLContext(SSL_CTX & context)
+ : context_(&context, &SSL_CTX_free)
+{
+ SSL_CTX_up_ref(context_.get());
+}
+
+SSLContext::SSLContext(const SSLParams & context_params)
+ : context_(prepareSSLContext(context_params), &SSL_CTX_free)
+{
+}
+
+SSL_CTX * SSLContext::getContext() {
+ return context_.get();
+}
+
+// Allows caller to use returned value of `statement` if there was no error, throws exception otherwise.
+#define HANDLE_SSL_ERROR(SSL_PTR, statement) [&] { \
+ if (const auto ret_code = (statement); ret_code <= 0) { \
+ throwSSLError(SSL_PTR, SSL_get_error(SSL_PTR, ret_code), LOCATION, #statement); \
+ return static_cast>(0); \
+ } \
+ else \
+ return ret_code; \
+} ()
+
+/* // debug macro for tracing SSL state
+#define LOG_SSL_STATE() std::cerr << "!!!!" << LOCATION << " @" << __FUNCTION__ \
+ << "\t" << SSL_get_version(ssl_) << " state: " << SSL_state_string_long(ssl_) \
+ << "\n\t handshake state: " << SSL_get_state(ssl_) \
+ << std::endl
+*/
+SSLSocket::SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params,
+ SSLContext& context)
+ : Socket(addr)
+ , ssl_(SSL_new(context.getContext()), &SSL_free)
+{
+ auto ssl = ssl_.get();
+ if (!ssl)
+ throw clickhouse::OpenSSLError("Failed to create SSL instance");
+
+ std::unique_ptr ip_addr(a2i_IPADDRESS(addr.Host().c_str()), &ASN1_OCTET_STRING_free);
+
+ HANDLE_SSL_ERROR(ssl, SSL_set_fd(ssl, handle_));
+ if (ssl_params.use_SNI)
+ HANDLE_SSL_ERROR(ssl, SSL_set_tlsext_host_name(ssl, addr.Host().c_str()));
+
+ if (ssl_params.host_flags != -1)
+ SSL_set_hostflags(ssl, ssl_params.host_flags);
+ HANDLE_SSL_ERROR(ssl, SSL_set1_host(ssl, addr.Host().c_str()));
+
+ // DO NOT use SSL_set_verify(ssl, SSL_VERIFY_PEER, nullptr), since
+ // we check verification result later, and that provides better error message.
+
+ if (ssl_params.configuration.size() > 0)
+ configureSSL(ssl_params.configuration, ssl);
+
+ SSL_set_connect_state(ssl);
+ HANDLE_SSL_ERROR(ssl, SSL_connect(ssl));
+ HANDLE_SSL_ERROR(ssl, SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY));
+
+ if (const auto verify_result = SSL_get_verify_result(ssl); !ssl_params.skip_verification && verify_result != X509_V_OK) {
+ auto error_message = X509_verify_cert_error_string(verify_result);
+ throw clickhouse::OpenSSLError("Failed to verify SSL connection, X509_v error: "
+ + std::to_string(verify_result)
+ + " " + error_message
+ + "\nServer certificate: " + getCertificateInfo(SSL_get_peer_certificate(ssl)));
+ }
+
+ // Host name verification is done by OpenSSL itself, however if we are connecting to an ip-address,
+ // no verification is made, so we have to do it manually.
+ // Just in case if this is ever required, leave it here commented out.
+// if (ip_addr) {
+// // if hostname is actually an IP address
+// HANDLE_SSL_ERROR(ssl, X509_check_ip(
+// SSL_get_peer_certificate(ssl),
+// ASN1_STRING_get0_data(ip_addr.get()),
+// ASN1_STRING_length(ip_addr.get()),
+// 0));
+// }
+}
+
+void SSLSocket::validateParams(const SSLParams & ssl_params) {
+ // We need either SSL or SSL_CTX to properly validate configuration, so create a temporary one.
+ std::unique_ptr ctx(SSL_CTX_new(TLS_client_method()), &SSL_CTX_free);
+ configureSSL(ssl_params.configuration, nullptr, ctx.get());
+}
+
+
+SSLSocketFactory::SSLSocketFactory(const ClientOptions& opts)
+ : NonSecureSocketFactory()
+ , ssl_params_(GetSSLParams(opts)) {
+ if (opts.ssl_options->ssl_context) {
+ ssl_context_ = std::make_unique(*opts.ssl_options->ssl_context);
+ } else {
+ ssl_context_ = std::make_unique(ssl_params_);
+ }
+}
+
+SSLSocketFactory::~SSLSocketFactory() = default;
+
+std::unique_ptr SSLSocketFactory::doConnect(const NetworkAddress& address) {
+ return std::make_unique(address, ssl_params_, *ssl_context_);
+}
+
+std::unique_ptr SSLSocket::makeInputStream() const {
+ return std::make_unique(ssl_.get());
+}
+
+std::unique_ptr SSLSocket::makeOutputStream() const {
+ return std::make_unique(ssl_.get());
+}
+
+SSLSocketInput::SSLSocketInput(SSL *ssl)
+ : ssl_(ssl)
+{}
+
+size_t SSLSocketInput::DoRead(void* buf, size_t len) {
+ size_t actually_read;
+ HANDLE_SSL_ERROR(ssl_, SSL_read_ex(ssl_, buf, len, &actually_read));
+ return actually_read;
+}
+
+SSLSocketOutput::SSLSocketOutput(SSL *ssl)
+ : ssl_(ssl)
+{}
+
+size_t SSLSocketOutput::DoWrite(const void* data, size_t len) {
+ return static_cast(HANDLE_SSL_ERROR(ssl_, SSL_write(ssl_, data, len)));
+}
+
+#undef HANDLE_SSL_ERROR
+
+}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/sslsocket.h b/src/vendor/clickhouse-cpp/clickhouse/base/sslsocket.h
new file mode 100644
index 0000000..f37e4a5
--- /dev/null
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/sslsocket.h
@@ -0,0 +1,109 @@
+#pragma once
+
+#include "socket.h"
+
+#include
+#include
+#include
+
+typedef struct ssl_ctx_st SSL_CTX;
+typedef struct ssl_st SSL;
+
+namespace clickhouse {
+
+struct SSLParams
+{
+ std::vector path_to_ca_files;
+ std::string path_to_ca_directory;
+ bool use_default_ca_locations;
+ int context_options;
+ int min_protocol_version;
+ int max_protocol_version;
+ bool use_SNI;
+ bool skip_verification;
+ int host_flags;
+ using ConfigurationType = std::vector>>;
+ ConfigurationType configuration;
+};
+
+class SSLContext
+{
+public:
+ explicit SSLContext(SSL_CTX & context);
+ explicit SSLContext(const SSLParams & context_params);
+ ~SSLContext() = default;
+
+ SSLContext(const SSLContext &) = delete;
+ SSLContext& operator=(const SSLContext &) = delete;
+ SSLContext(SSLContext &&) = delete;
+ SSLContext& operator=(SSLContext &) = delete;
+
+private:
+ friend class SSLSocket;
+ SSL_CTX * getContext();
+
+private:
+ std::unique_ptr context_;
+};
+
+class SSLSocket : public Socket {
+public:
+ explicit SSLSocket(const NetworkAddress& addr, const SSLParams & ssl_params, SSLContext& context);
+ SSLSocket(SSLSocket &&) = default;
+ ~SSLSocket() override = default;
+
+ SSLSocket(const SSLSocket & ) = delete;
+ SSLSocket& operator=(const SSLSocket & ) = delete;
+
+ std::unique_ptr makeInputStream() const override;
+ std::unique_ptr makeOutputStream() const override;
+
+ static void validateParams(const SSLParams & ssl_params);
+private:
+ std::unique_ptr ssl_;
+};
+
+class SSLSocketFactory : public NonSecureSocketFactory {
+public:
+ explicit SSLSocketFactory(const ClientOptions& opts);
+ ~SSLSocketFactory() override;
+
+protected:
+ std::unique_ptr doConnect(const NetworkAddress& address) override;
+
+private:
+ const SSLParams ssl_params_;
+ std::unique_ptr ssl_context_;
+};
+
+class SSLSocketInput : public InputStream {
+public:
+ explicit SSLSocketInput(SSL *ssl);
+ ~SSLSocketInput() = default;
+
+ bool Skip(size_t /*bytes*/) override {
+ return false;
+ }
+
+protected:
+ size_t DoRead(void* buf, size_t len) override;
+
+private:
+ // Not owning
+ SSL *ssl_;
+};
+
+class SSLSocketOutput : public OutputStream {
+public:
+ explicit SSLSocketOutput(SSL *ssl);
+ ~SSLSocketOutput() = default;
+
+protected:
+ size_t DoWrite(const void* data, size_t len) override;
+
+private:
+ // Not owning
+ SSL *ssl_;
+};
+
+}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/string_view.h b/src/vendor/clickhouse-cpp/clickhouse/base/string_view.h
index 7d35585..46619ed 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/string_view.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/string_view.h
@@ -75,6 +75,11 @@ class StringViewImpl {
return size_;
}
+ // to mimic std::string and std::string_view
+ inline size_type length() const noexcept {
+ return size();
+ }
+
public:
// Returns a substring [pos, pos + count).
// If the requested substring extends past the end of the string,
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/wire_format.cpp b/src/vendor/clickhouse-cpp/clickhouse/base/wire_format.cpp
new file mode 100644
index 0000000..dfe51b8
--- /dev/null
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/wire_format.cpp
@@ -0,0 +1,102 @@
+#include "wire_format.h"
+
+#include "input.h"
+#include "output.h"
+
+#include "../exceptions.h"
+
+#include
+
+namespace {
+constexpr int MAX_VARINT_BYTES = 10;
+}
+
+namespace clickhouse {
+
+bool WireFormat::ReadAll(InputStream& input, void* buf, size_t len) {
+ uint8_t* p = static_cast(buf);
+
+ size_t read_previously = 1; // 1 to execute loop at least once
+ while (len > 0 && read_previously) {
+ read_previously = input.Read(p, len);
+
+ p += read_previously;
+ len -= read_previously;
+ }
+
+ return !len;
+}
+
+void WireFormat::WriteAll(OutputStream& output, const void* buf, size_t len) {
+ const size_t original_len = len;
+ const uint8_t* p = static_cast(buf);
+
+ size_t written_previously = 1; // 1 to execute loop at least once
+ while (len > 0 && written_previously) {
+ written_previously = output.Write(p, len);
+
+ p += written_previously;
+ len -= written_previously;
+ }
+
+ if (len) {
+ throw Error("Failed to write " + std::to_string(original_len)
+ + " bytes, only written " + std::to_string(original_len - len));
+ }
+}
+
+bool WireFormat::ReadVarint64(InputStream& input, uint64_t* value) {
+ *value = 0;
+
+ for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
+ uint8_t byte = 0;
+
+ if (!input.ReadByte(&byte)) {
+ return false;
+ } else {
+ *value |= uint64_t(byte & 0x7F) << (7 * i);
+
+ if (!(byte & 0x80)) {
+ return true;
+ }
+ }
+ }
+
+ // TODO skip invalid
+ return false;
+}
+
+void WireFormat::WriteVarint64(OutputStream& output, uint64_t value) {
+ uint8_t bytes[MAX_VARINT_BYTES];
+ int size = 0;
+
+ for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
+ uint8_t byte = value & 0x7F;
+ if (value > 0x7F)
+ byte |= 0x80;
+
+ bytes[size++] = byte;
+
+ value >>= 7;
+ if (!value) {
+ break;
+ }
+ }
+
+ WriteAll(output, bytes, size);
+}
+
+bool WireFormat::SkipString(InputStream& input) {
+ uint64_t len = 0;
+
+ if (ReadVarint64(input, &len)) {
+ if (len > 0x00FFFFFFULL)
+ return false;
+
+ return input.Skip((size_t)len);
+ }
+
+ return false;
+}
+
+}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/base/wire_format.h b/src/vendor/clickhouse-cpp/clickhouse/base/wire_format.h
index f383fcb..9bbf795 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/base/wire_format.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/base/wire_format.h
@@ -1,101 +1,76 @@
#pragma once
-#include "coded.h"
-
#include
namespace clickhouse {
+class InputStream;
+class OutputStream;
+
class WireFormat {
public:
template
- static bool ReadFixed(CodedInputStream* input, T* value);
-
- static bool ReadString(CodedInputStream* input, std::string* value);
-
- static bool ReadBytes(CodedInputStream* input, void* buf, size_t len);
-
- static bool ReadUInt64(CodedInputStream* input, uint64_t* value);
-
+ static bool ReadFixed(InputStream& input, T* value);
+ static bool ReadString(InputStream& input, std::string* value);
+ static bool SkipString(InputStream& input);
+ static bool ReadBytes(InputStream& input, void* buf, size_t len);
+ static bool ReadUInt64(InputStream& input, uint64_t* value);
+ static bool ReadVarint64(InputStream& output, uint64_t* value);
template
- static void WriteFixed(CodedOutputStream* output, const T& value);
-
- static void WriteBytes(CodedOutputStream* output, const void* buf, size_t len);
-
- static void WriteString(CodedOutputStream* output, const std::string& value);
-
- static void WriteUInt64(CodedOutputStream* output, const uint64_t value);
+ static void WriteFixed(OutputStream& output, const T& value);
+ static void WriteBytes(OutputStream& output, const void* buf, size_t len);
+ static void WriteString(OutputStream& output, std::string_view value);
+ static void WriteUInt64(OutputStream& output, const uint64_t value);
+ static void WriteVarint64(OutputStream& output, uint64_t value);
+
+private:
+ static bool ReadAll(InputStream& input, void* buf, size_t len);
+ static void WriteAll(OutputStream& output, const void* buf, size_t len);
};
template
-inline bool WireFormat::ReadFixed(
- CodedInputStream* input,
- T* value)
-{
- return input->ReadRaw(value, sizeof(T));
+inline bool WireFormat::ReadFixed(InputStream& input, T* value) {
+ return ReadAll(input, value, sizeof(T));
}
-inline bool WireFormat::ReadString(
- CodedInputStream* input,
- std::string* value)
-{
- uint64_t len;
-
- if (input->ReadVarint64(&len)) {
+inline bool WireFormat::ReadString(InputStream& input, std::string* value) {
+ uint64_t len = 0;
+ if (ReadVarint64(input, &len)) {
if (len > 0x00FFFFFFULL) {
return false;
}
value->resize((size_t)len);
- return input->ReadRaw(&(*value)[0], (size_t)len);
+ return ReadAll(input, value->data(), (size_t)len);
}
return false;
}
-inline bool WireFormat::ReadBytes(
- CodedInputStream* input, void* buf, size_t len)
-{
- return input->ReadRaw(buf, len);
+inline bool WireFormat::ReadBytes(InputStream& input, void* buf, size_t len) {
+ return ReadAll(input, buf, len);
}
-inline bool WireFormat::ReadUInt64(
- CodedInputStream* input,
- uint64_t* value)
-{
- return input->ReadVarint64(value);
+inline bool WireFormat::ReadUInt64(InputStream& input, uint64_t* value) {
+ return ReadVarint64(input, value);
}
-
template
-inline void WireFormat::WriteFixed(
- CodedOutputStream* output,
- const T& value)
-{
- output->WriteRaw(&value, sizeof(T));
+inline void WireFormat::WriteFixed(OutputStream& output, const T& value) {
+ WriteAll(output, &value, sizeof(T));
}
-inline void WireFormat::WriteBytes(
- CodedOutputStream* output,
- const void* buf,
- size_t len)
-{
- output->WriteRaw(buf, len);
+inline void WireFormat::WriteBytes(OutputStream& output, const void* buf, size_t len) {
+ WriteAll(output, buf, len);
}
-inline void WireFormat::WriteString(
- CodedOutputStream* output,
- const std::string& value)
-{
- output->WriteVarint64(value.size());
- output->WriteRaw(value.data(), value.size());
+inline void WireFormat::WriteString(OutputStream& output, std::string_view value) {
+ WriteVarint64(output, value.size());
+ WriteAll(output, value.data(), value.size());
}
-inline void WireFormat::WriteUInt64(
- CodedOutputStream* output,
- const uint64_t value)
-{
- output->WriteVarint64(value);
+inline void WireFormat::WriteUInt64(OutputStream& output, const uint64_t value) {
+ WriteVarint64(output, value);
}
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/block.cpp b/src/vendor/clickhouse-cpp/clickhouse/block.cpp
index 685173c..aca77c0 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/block.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/block.cpp
@@ -1,5 +1,7 @@
#include "block.h"
+#include "exceptions.h"
+
#include
namespace clickhouse {
@@ -10,6 +12,11 @@ Block::Iterator::Iterator(const Block& block)
{
}
+Block::Iterator::Iterator(const Block& block, Block::Iterator::ConstructAtEndTag /*at_end*/)
+ : block_(block)
+ , idx_(block.GetColumnCount())
+{}
+
const std::string& Block::Iterator::Name() const {
return block_.columns_[idx_].name;
}
@@ -22,8 +29,9 @@ ColumnRef Block::Iterator::Column() const {
return block_.columns_[idx_].column;
}
-void Block::Iterator::Next() {
+bool Block::Iterator::Next() {
++idx_;
+ return IsValid();
}
bool Block::Iterator::IsValid() const {
@@ -48,7 +56,7 @@ void Block::AppendColumn(const std::string& name, const ColumnRef& col) {
if (columns_.empty()) {
rows_ = col->Size();
} else if (col->Size() != rows_) {
- throw std::runtime_error("all columns in block must have same count of rows. Name: ["+name+"], rows: ["+std::to_string(rows_)+"], columns: [" + std::to_string(col->Size())+"]");
+ throw ValidationError("all columns in block must have same count of rows. Name: ["+name+"], rows: ["+std::to_string(rows_)+"], columns: [" + std::to_string(col->Size())+"]");
}
columns_.push_back(ColumnItem{name, col});
@@ -80,7 +88,7 @@ size_t Block::RefreshRowCount()
if (idx == 0UL)
rows = col->Size();
else if (rows != col->Size())
- throw std::runtime_error("all columns in block must have same count of rows. Name: ["+name+"], rows: ["+std::to_string(rows)+"], columns: [" + std::to_string(col->Size())+"]");
+ throw ValidationError("all columns in block must have same count of rows. Name: ["+name+"], rows: ["+std::to_string(rows)+"], columns: [" + std::to_string(col->Size())+"]");
}
rows_ = rows;
@@ -95,4 +103,12 @@ ColumnRef Block::operator [] (size_t idx) const {
throw std::out_of_range("column index is out of range. Index: ["+std::to_string(idx)+"], columns: [" + std::to_string(columns_.size())+"]");
}
+Block::Iterator Block::begin() const {
+ return Iterator(*this);
+}
+
+Block::Iterator Block::end() const {
+ return Iterator(*this, Iterator::ConstructAtEndTag{});
+}
+
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/block.h b/src/vendor/clickhouse-cpp/clickhouse/block.h
index b2b2d14..a647f12 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/block.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/block.h
@@ -25,13 +25,35 @@ class Block {
/// Reference to column object.
ColumnRef Column() const;
- /// Move to next column.
- void Next();
+ /// Move to next column, returns false if next call to IsValid() would return false;
+ bool Next();
/// Is the iterator still valid.
bool IsValid() const;
+ size_t ColumnIndex() const {
+ return idx_;
+ }
+
+ Iterator& operator*() { return *this; }
+ const Iterator& operator*() const { return *this; }
+
+ bool operator==(const Iterator & other) const {
+ return &block_ == &other.block_ && idx_ == other.idx_;
+ }
+ bool operator!=(const Iterator & other) const {
+ return !(*this == other);
+ }
+
+ Iterator& operator++() {
+ this->Next();
+ return *this;
+ }
+
private:
+ friend class Block;
+ struct ConstructAtEndTag {};
+ Iterator(const Block& block, ConstructAtEndTag at_end);
Iterator() = delete;
const Block& block_;
@@ -63,6 +85,11 @@ class Block {
/// Reference to column by index in the block.
ColumnRef operator [] (size_t idx) const;
+ Iterator begin() const;
+ Iterator end() const;
+ Iterator cbegin() const { return begin(); }
+ Iterator cend() const { return end(); }
+
private:
struct ColumnItem {
std::string name;
diff --git a/src/vendor/clickhouse-cpp/clickhouse/client.cpp b/src/vendor/clickhouse-cpp/clickhouse/client.cpp
index b9bc5f0..8a7c1e2 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/client.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/client.cpp
@@ -1,16 +1,12 @@
#include "client.h"
#include "protocol.h"
-#include "base/coded.h"
#include "base/compressed.h"
#include "base/socket.h"
#include "base/wire_format.h"
#include "columns/factory.h"
-#include
-#include
-
#include
#include
#include
@@ -19,10 +15,13 @@
#include
#include
+#if defined(WITH_OPENSSL)
+#include "base/sslsocket.h"
+#endif
+
#define DBMS_NAME "ClickHouse"
-#define DBMS_VERSION_MAJOR 1
+#define DBMS_VERSION_MAJOR 2
#define DBMS_VERSION_MINOR 1
-#define REVISION 54126
#define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
#define DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS 51554
@@ -30,6 +29,13 @@
#define DBMS_MIN_REVISION_WITH_CLIENT_INFO 54032
#define DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE 54058
#define DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO 54060
+//#define DBMS_MIN_REVISION_WITH_TABLES_STATUS 54226
+#define DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE 54337
+#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
+#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
+#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
+
+#define REVISION DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE
namespace clickhouse {
@@ -45,55 +51,91 @@ struct ClientInfo {
std::string initial_address = "[::ffff:127.0.0.1]:0";
uint64_t client_version_major = 0;
uint64_t client_version_minor = 0;
+ uint64_t client_version_patch = 0;
uint32_t client_revision = 0;
};
-struct ServerInfo {
- std::string name;
- std::string timezone;
- uint64_t version_major;
- uint64_t version_minor;
- uint64_t revision;
-};
-
std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) {
os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port
<< " ping_before_query:" << opt.ping_before_query
<< " send_retries:" << opt.send_retries
<< " retry_timeout:" << opt.retry_timeout.count()
<< " compression_method:"
- << (opt.compression_method == CompressionMethod::LZ4 ? "LZ4" : "None")
- << ")";
+ << (opt.compression_method == CompressionMethod::LZ4 ? "LZ4" : "None");
+#if defined(WITH_OPENSSL)
+ if (opt.ssl_options) {
+ const auto & ssl_options = *opt.ssl_options;
+ os << " SSL ("
+ << " ssl_context: " << (ssl_options.ssl_context ? "provided by user" : "created internally")
+ << " use_default_ca_locations: " << ssl_options.use_default_ca_locations
+ << " path_to_ca_files: " << ssl_options.path_to_ca_files.size() << " items"
+ << " path_to_ca_directory: " << ssl_options.path_to_ca_directory
+ << " min_protocol_version: " << ssl_options.min_protocol_version
+ << " max_protocol_version: " << ssl_options.max_protocol_version
+ << " context_options: " << ssl_options.context_options
+ << ")";
+ }
+#endif
+ os << ")";
return os;
}
+ClientOptions& ClientOptions::SetSSLOptions(ClientOptions::SSLOptions options)
+{
+#ifdef WITH_OPENSSL
+ ssl_options = options;
+ return *this;
+#else
+ (void)options;
+ throw OpenSSLError("Library was built with no SSL support");
+#endif
+}
+
+namespace {
+
+std::unique_ptr GetSocketFactory(const ClientOptions& opts) {
+ (void)opts;
+#if defined(WITH_OPENSSL)
+ if (opts.ssl_options)
+ return std::make_unique(opts);
+ else
+#endif
+ return std::make_unique();
+}
+
+}
+
class Client::Impl {
public:
Impl(const ClientOptions& opts);
+ Impl(const ClientOptions& opts,
+ std::unique_ptr socket_factory);
~Impl();
void ExecuteQuery(Query query);
void SendCancel();
- void Insert(const std::string& table_name, const Block& block);
+ void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
void Ping();
void ResetConnection();
+ const ServerInfo& GetServerInfo() const;
+
private:
bool Handshake();
bool ReceivePacket(uint64_t* server_packet = nullptr);
- void SendQuery(const std::string& query);
+ void SendQuery(const std::string& query, const std::string& query_id);
void SendData(const Block& block);
bool SendHello();
- bool ReadBlock(Block* block, CodedInputStream* input);
+ bool ReadBlock(InputStream& input, Block* block);
bool ReceiveHello();
@@ -103,12 +145,14 @@ class Client::Impl {
/// Reads exception packet form input stream.
bool ReceiveException(bool rethrow = false);
- void WriteBlock(const Block& block, CodedOutputStream* output);
+ void WriteBlock(const Block& block, OutputStream& output);
+
+ void InitializeStreams(std::unique_ptr&& socket);
private:
/// In case of network errors tries to reconnect to server and
/// call fuc several times.
- void RetryGuard(std::function fuc);
+ void RetryGuard(std::function func);
private:
class EnsureNull {
@@ -137,33 +181,26 @@ class Client::Impl {
QueryEvents* events_;
int compression_ = CompressionState::Disable;
- SocketHolder socket_;
+ std::unique_ptr socket_factory_;
- SocketInput socket_input_;
- BufferedInput buffered_input_;
- CodedInputStream input_;
-
- SocketOutput socket_output_;
- BufferedOutput buffered_output_;
- CodedOutputStream output_;
+ std::unique_ptr input_;
+ std::unique_ptr output_;
+ std::unique_ptr socket_;
ServerInfo server_info_;
};
+
Client::Impl::Impl(const ClientOptions& opts)
+ : Impl(opts, GetSocketFactory(opts)) {}
+
+Client::Impl::Impl(const ClientOptions& opts,
+ std::unique_ptr socket_factory)
: options_(opts)
, events_(nullptr)
- , socket_(-1)
- , socket_input_(socket_)
- , buffered_input_(&socket_input_)
- , input_(&buffered_input_)
- , socket_output_(socket_)
- , buffered_output_(&socket_output_)
- , output_(&buffered_output_)
+ , socket_factory_(std::move(socket_factory))
{
- // TODO: throw on big-endianness of platform
-
- for (int i = 0; ; ) {
+ for (unsigned int i = 0; ; ) {
try {
ResetConnection();
break;
@@ -172,7 +209,7 @@ Client::Impl::Impl(const ClientOptions& opts)
throw;
}
- std::this_thread::sleep_for(options_.retry_timeout);
+ socket_factory_->sleepFor(options_.retry_timeout);
}
}
@@ -191,69 +228,49 @@ void Client::Impl::ExecuteQuery(Query query) {
RetryGuard([this]() { Ping(); });
}
- SendQuery(query.GetText());
+ SendQuery(query.GetText(), query.GetQueryID());
while (ReceivePacket()) {
;
}
}
-
std::string NameToQueryString(const std::string &input)
{
- std::string output = "`";
- const char *c = input.c_str();
- while (*c) {
- switch (*c) {
- // // needs test cases
- // case '"':
- // output.append("\\\""); break;
- // case '`':
- // output.append("\\`"); break;
- // case '\'':
- // output.append("\\'"); break;
- // case '[':
- // output.append("\\["); break;
- // case ']':
- // output.append("\\]"); break;
- // case '%':
- // output.append("\\%"); break;
- // case '_':
- // output.append("\\_"); break;
- // case '\\':
- // output.append("\\\\"); break;
- default:
- output.push_back(*c); break;
- }
- ++c;
- }
- output += "`";
+ std::string output;
+ output.reserve(input.size() + 2);
+ output += '`';
+
+ for (const auto & c : input) {
+ if (c == '`') {
+ //escape ` with ``
+ output.append("``");
+ } else {
+ output.push_back(c);
+ }
+ }
+
+ output += '`';
return output;
}
-void Client::Impl::Insert(const std::string& table_name, const Block& block) {
+void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}
- std::vector fields;
- fields.reserve(block.GetColumnCount());
-
- // Enumerate all fields
- for (unsigned int i = 0; i < block.GetColumnCount(); i++) {
- fields.push_back(NameToQueryString(block.GetColumnName(i)));
- }
-
std::stringstream fields_section;
+ const auto num_columns = block.GetColumnCount();
- for (auto elem = fields.begin(); elem != fields.end(); ++elem) {
- if (std::distance(elem, fields.end()) == 1) {
- fields_section << *elem;
+ for (unsigned int i = 0; i < num_columns; ++i) {
+ if (i == num_columns - 1) {
+ fields_section << NameToQueryString(block.GetColumnName(i));
} else {
- fields_section << *elem << ",";
+ fields_section << NameToQueryString(block.GetColumnName(i)) << ",";
}
}
- SendQuery("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES");
+
+ SendQuery("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id);
uint64_t server_packet;
// Receive data packet.
@@ -261,7 +278,7 @@ void Client::Impl::Insert(const std::string& table_name, const Block& block) {
bool ret = ReceivePacket(&server_packet);
if (!ret) {
- throw std::runtime_error("fail to receive data packet");
+ throw ProtocolError("fail to receive data packet");
}
if (server_packet == ServerCodes::Data) {
break;
@@ -278,47 +295,42 @@ void Client::Impl::Insert(const std::string& table_name, const Block& block) {
SendData(Block());
// Wait for EOS.
- while (ReceivePacket()) {
+ uint64_t eos_packet{0};
+ while (ReceivePacket(&eos_packet)) {
;
}
+
+ if (eos_packet != ServerCodes::EndOfStream && eos_packet != ServerCodes::Exception
+ && eos_packet != ServerCodes::Log && options_.rethrow_exceptions) {
+ throw ProtocolError(std::string{"unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "}
+ + (eos_packet ? std::to_string(eos_packet) : "nothing") + ")");
+ }
}
void Client::Impl::Ping() {
- WireFormat::WriteUInt64(&output_, ClientCodes::Ping);
- output_.Flush();
+ WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
+ output_->Flush();
uint64_t server_packet;
const bool ret = ReceivePacket(&server_packet);
if (!ret || server_packet != ServerCodes::Pong) {
- throw std::runtime_error("fail to ping server");
+ throw ProtocolError("fail to ping server");
}
}
void Client::Impl::ResetConnection() {
- SocketHolder s(SocketConnect(NetworkAddress(options_.host, std::to_string(options_.port))));
-
- if (s.Closed()) {
- throw std::system_error(errno, std::system_category());
- }
-
- if (options_.tcp_keepalive) {
- s.SetTcpKeepAlive(options_.tcp_keepalive_idle.count(),
- options_.tcp_keepalive_intvl.count(),
- options_.tcp_keepalive_cnt);
- }
-
- socket_ = std::move(s);
- socket_input_ = SocketInput(socket_);
- socket_output_ = SocketOutput(socket_);
- buffered_input_.Reset();
- buffered_output_.Reset();
+ InitializeStreams(socket_factory_->connect(options_));
if (!Handshake()) {
- throw std::runtime_error("fail to connect to " + options_.host);
+ throw ProtocolError("fail to connect to " + options_.host);
}
}
+const ServerInfo& Client::Impl::GetServerInfo() const {
+ return server_info_;
+}
+
bool Client::Impl::Handshake() {
if (!SendHello()) {
return false;
@@ -332,7 +344,7 @@ bool Client::Impl::Handshake() {
bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
uint64_t packet_type = 0;
- if (!input_.ReadVarint64(&packet_type)) {
+ if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
return false;
}
if (server_packet) {
@@ -342,7 +354,7 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
switch (packet_type) {
case ServerCodes::Data: {
if (!ReceiveData()) {
- throw std::runtime_error("can't read data packet from input stream");
+ throw ProtocolError("can't read data packet from input stream");
}
return true;
}
@@ -355,22 +367,22 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
case ServerCodes::ProfileInfo: {
Profile profile;
- if (!WireFormat::ReadUInt64(&input_, &profile.rows)) {
+ if (!WireFormat::ReadUInt64(*input_, &profile.rows)) {
return false;
}
- if (!WireFormat::ReadUInt64(&input_, &profile.blocks)) {
+ if (!WireFormat::ReadUInt64(*input_, &profile.blocks)) {
return false;
}
- if (!WireFormat::ReadUInt64(&input_, &profile.bytes)) {
+ if (!WireFormat::ReadUInt64(*input_, &profile.bytes)) {
return false;
}
- if (!WireFormat::ReadFixed(&input_, &profile.applied_limit)) {
+ if (!WireFormat::ReadFixed(*input_, &profile.applied_limit)) {
return false;
}
- if (!WireFormat::ReadUInt64(&input_, &profile.rows_before_limit)) {
+ if (!WireFormat::ReadUInt64(*input_, &profile.rows_before_limit)) {
return false;
}
- if (!WireFormat::ReadFixed(&input_, &profile.calculated_rows_before_limit)) {
+ if (!WireFormat::ReadFixed(*input_, &profile.calculated_rows_before_limit)) {
return false;
}
@@ -384,14 +396,14 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
case ServerCodes::Progress: {
Progress info;
- if (!WireFormat::ReadUInt64(&input_, &info.rows)) {
+ if (!WireFormat::ReadUInt64(*input_, &info.rows)) {
return false;
}
- if (!WireFormat::ReadUInt64(&input_, &info.bytes)) {
+ if (!WireFormat::ReadUInt64(*input_, &info.bytes)) {
return false;
}
if (REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) {
- if (!WireFormat::ReadUInt64(&input_, &info.total_rows)) {
+ if (!WireFormat::ReadUInt64(*input_, &info.total_rows)) {
return false;
}
}
@@ -415,14 +427,14 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
}
default:
- throw std::runtime_error("unimplemented " + std::to_string((int)packet_type));
+ throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
break;
}
return false;
}
-bool Client::Impl::ReadBlock(Block* block, CodedInputStream* input) {
+bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
// Additional information about block.
if (REVISION >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
uint64_t num;
@@ -458,10 +470,12 @@ bool Client::Impl::ReadBlock(Block* block, CodedInputStream* input) {
return false;
}
- for (size_t i = 0; i < num_columns; ++i) {
- std::string name;
- std::string type;
+ CreateColumnByTypeSettings create_column_settings;
+ create_column_settings.low_cardinality_as_wrapped_column = options_.backward_compatibility_lowcardinality_as_wrapped_column;
+ std::string name;
+ std::string type;
+ for (size_t i = 0; i < num_columns; ++i) {
if (!WireFormat::ReadString(input, &name)) {
return false;
}
@@ -469,14 +483,14 @@ bool Client::Impl::ReadBlock(Block* block, CodedInputStream* input) {
return false;
}
- if (ColumnRef col = CreateColumnByType(type)) {
- if (num_rows && !col->Load(input, num_rows)) {
- throw std::runtime_error("can't load");
+ if (ColumnRef col = CreateColumnByType(type, create_column_settings)) {
+ if (num_rows && !col->Load(&input, num_rows)) {
+ throw ProtocolError("can't load column '" + name + "' of type " + type);
}
block->AppendColumn(name, col);
} else {
- throw std::runtime_error(std::string("unsupported column type: ") + type);
+ throw UnimplementedError(std::string("unsupported column type: ") + type);
}
}
@@ -487,22 +501,18 @@ bool Client::Impl::ReceiveData() {
Block block;
if (REVISION >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
- std::string table_name;
-
- if (!WireFormat::ReadString(&input_, &table_name)) {
+ if (!WireFormat::SkipString(*input_)) {
return false;
}
}
if (compression_ == CompressionState::Enable) {
- CompressedInput compressed(&input_);
- CodedInputStream coded(&compressed);
-
- if (!ReadBlock(&block, &coded)) {
+ CompressedInput compressed(input_.get());
+ if (!ReadBlock(compressed, &block)) {
return false;
}
} else {
- if (!ReadBlock(&block, &input_)) {
+ if (!ReadBlock(*input_, &block)) {
return false;
}
}
@@ -521,23 +531,29 @@ bool Client::Impl::ReceiveException(bool rethrow) {
std::unique_ptr e(new Exception);
Exception* current = e.get();
+ bool exception_received = true;
do {
bool has_nested = false;
- if (!WireFormat::ReadFixed(&input_, ¤t->code)) {
- return false;
+ if (!WireFormat::ReadFixed(*input_, ¤t->code)) {
+ exception_received = false;
+ break;
}
- if (!WireFormat::ReadString(&input_, ¤t->name)) {
- return false;
+ if (!WireFormat::ReadString(*input_, ¤t->name)) {
+ exception_received = false;
+ break;
}
- if (!WireFormat::ReadString(&input_, ¤t->display_text)) {
- return false;
+ if (!WireFormat::ReadString(*input_, ¤t->display_text)) {
+ exception_received = false;
+ break;
}
- if (!WireFormat::ReadString(&input_, ¤t->stack_trace)) {
- return false;
+ if (!WireFormat::ReadString(*input_, ¤t->stack_trace)) {
+ exception_received = false;
+ break;
}
- if (!WireFormat::ReadFixed(&input_, &has_nested)) {
- return false;
+ if (!WireFormat::ReadFixed(*input_, &has_nested)) {
+ exception_received = false;
+ break;
}
if (has_nested) {
@@ -553,20 +569,20 @@ bool Client::Impl::ReceiveException(bool rethrow) {
}
if (rethrow || options_.rethrow_exceptions) {
- throw ServerException(std::move(e));
+ throw ServerError(std::move(e));
}
- return true;
+ return exception_received;
}
void Client::Impl::SendCancel() {
- WireFormat::WriteUInt64(&output_, ClientCodes::Cancel);
- output_.Flush();
+ WireFormat::WriteUInt64(*output_, ClientCodes::Cancel);
+ output_->Flush();
}
-void Client::Impl::SendQuery(const std::string& query) {
- WireFormat::WriteUInt64(&output_, ClientCodes::Query);
- WireFormat::WriteString(&output_, std::string());
+void Client::Impl::SendQuery(const std::string& query, const std::string& query_id) {
+ WireFormat::WriteUInt64(*output_, ClientCodes::Query);
+ WireFormat::WriteString(*output_, query_id);
/// Client info.
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) {
@@ -579,47 +595,50 @@ void Client::Impl::SendQuery(const std::string& query) {
info.client_revision = REVISION;
- WireFormat::WriteFixed(&output_, info.query_kind);
- WireFormat::WriteString(&output_, info.initial_user);
- WireFormat::WriteString(&output_, info.initial_query_id);
- WireFormat::WriteString(&output_, info.initial_address);
- WireFormat::WriteFixed(&output_, info.iface_type);
+ WireFormat::WriteFixed(*output_, info.query_kind);
+ WireFormat::WriteString(*output_, info.initial_user);
+ WireFormat::WriteString(*output_, info.initial_query_id);
+ WireFormat::WriteString(*output_, info.initial_address);
+ WireFormat::WriteFixed(*output_, info.iface_type);
- WireFormat::WriteString(&output_, info.os_user);
- WireFormat::WriteString(&output_, info.client_hostname);
- WireFormat::WriteString(&output_, info.client_name);
- WireFormat::WriteUInt64(&output_, info.client_version_major);
- WireFormat::WriteUInt64(&output_, info.client_version_minor);
- WireFormat::WriteUInt64(&output_, info.client_revision);
+ WireFormat::WriteString(*output_, info.os_user);
+ WireFormat::WriteString(*output_, info.client_hostname);
+ WireFormat::WriteString(*output_, info.client_name);
+ WireFormat::WriteUInt64(*output_, info.client_version_major);
+ WireFormat::WriteUInt64(*output_, info.client_version_minor);
+ WireFormat::WriteUInt64(*output_, info.client_revision);
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO)
- WireFormat::WriteString(&output_, info.quota_key);
+ WireFormat::WriteString(*output_, info.quota_key);
+ if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) {
+ WireFormat::WriteUInt64(*output_, info.client_version_patch);
+ }
}
/// Per query settings.
//if (settings)
// settings->serialize(*out);
//else
- WireFormat::WriteString(&output_, std::string());
+ WireFormat::WriteString(*output_, std::string());
- WireFormat::WriteUInt64(&output_, Stages::Complete);
- WireFormat::WriteUInt64(&output_, compression_);
- WireFormat::WriteString(&output_, query);
+ WireFormat::WriteUInt64(*output_, Stages::Complete);
+ WireFormat::WriteUInt64(*output_, compression_);
+ WireFormat::WriteString(*output_, query);
// Send empty block as marker of
// end of data
SendData(Block());
- output_.Flush();
+ output_->Flush();
}
-void Client::Impl::WriteBlock(const Block& block, CodedOutputStream* output) {
+void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
// Additional information about block.
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
WireFormat::WriteUInt64(output, 1);
- WireFormat::WriteFixed (output, block.Info().is_overflows);
+ WireFormat::WriteFixed(output, block.Info().is_overflows);
WireFormat::WriteUInt64(output, 2);
- WireFormat::WriteFixed (output, block.Info().bucket_num);
+ WireFormat::WriteFixed(output, block.Info().bucket_num);
WireFormat::WriteUInt64(output, 0);
}
@@ -630,73 +649,52 @@ void Client::Impl::WriteBlock(const Block& block, CodedOutputStream* output) {
WireFormat::WriteString(output, bi.Name());
WireFormat::WriteString(output, bi.Type()->GetName());
- bi.Column()->Save(output);
+ bi.Column()->Save(&output);
}
+ output.Flush();
}
void Client::Impl::SendData(const Block& block) {
- WireFormat::WriteUInt64(&output_, ClientCodes::Data);
+ WireFormat::WriteUInt64(*output_, ClientCodes::Data);
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
- WireFormat::WriteString(&output_, std::string());
+ WireFormat::WriteString(*output_, std::string());
}
if (compression_ == CompressionState::Enable) {
- switch (options_.compression_method) {
- case CompressionMethod::None: {
- assert(false);
- break;
- }
+ assert(options_.compression_method == CompressionMethod::LZ4);
- case CompressionMethod::LZ4: {
- Buffer tmp;
- // Serialize block's data
- {
- BufferOutput out(&tmp);
- CodedOutputStream coded(&out);
- WriteBlock(block, &coded);
- }
- // Reserver space for data
- Buffer buf;
- buf.resize(9 + LZ4_compressBound(tmp.size()));
-
- // Compress data
- int size = LZ4_compress((const char*)tmp.data(), (char*)buf.data() + 9, tmp.size());
- buf.resize(9 + size);
-
- // Fill header
- uint8_t* p = buf.data();
- // Compression method
- WriteUnaligned(p, (uint8_t)0x82); p += 1;
- // Compressed data size with header
- WriteUnaligned(p, (uint32_t)buf.size()); p += 4;
- // Original data size
- WriteUnaligned(p, (uint32_t)tmp.size());
-
- WireFormat::WriteFixed(&output_, CityHash128(
- (const char*)buf.data(), buf.size()));
- WireFormat::WriteBytes(&output_, buf.data(), buf.size());
- break;
- }
- }
+ std::unique_ptr compressed_output = std::make_unique(output_.get(), options_.max_compression_chunk_size);
+ BufferedOutput buffered(std::move(compressed_output), options_.max_compression_chunk_size);
+
+ WriteBlock(block, buffered);
} else {
- WriteBlock(block, &output_);
+ WriteBlock(block, *output_);
}
- output_.Flush();
+ output_->Flush();
+}
+
+void Client::Impl::InitializeStreams(std::unique_ptr&& socket) {
+ std::unique_ptr output = std::make_unique(socket->makeOutputStream());
+ std::unique_ptr input = std::make_unique(socket->makeInputStream());
+
+ std::swap(input, input_);
+ std::swap(output, output_);
+ std::swap(socket, socket_);
}
bool Client::Impl::SendHello() {
- WireFormat::WriteUInt64(&output_, ClientCodes::Hello);
- WireFormat::WriteString(&output_, std::string(DBMS_NAME) + " client");
- WireFormat::WriteUInt64(&output_, DBMS_VERSION_MAJOR);
- WireFormat::WriteUInt64(&output_, DBMS_VERSION_MINOR);
- WireFormat::WriteUInt64(&output_, REVISION);
- WireFormat::WriteString(&output_, options_.default_database);
- WireFormat::WriteString(&output_, options_.user);
- WireFormat::WriteString(&output_, options_.password);
+ WireFormat::WriteUInt64(*output_, ClientCodes::Hello);
+ WireFormat::WriteString(*output_, std::string(DBMS_NAME) + " client");
+ WireFormat::WriteUInt64(*output_, DBMS_VERSION_MAJOR);
+ WireFormat::WriteUInt64(*output_, DBMS_VERSION_MINOR);
+ WireFormat::WriteUInt64(*output_, REVISION);
+ WireFormat::WriteString(*output_, options_.default_database);
+ WireFormat::WriteString(*output_, options_.user);
+ WireFormat::WriteString(*output_, options_.password);
- output_.Flush();
+ output_->Flush();
return true;
}
@@ -704,26 +702,38 @@ bool Client::Impl::SendHello() {
bool Client::Impl::ReceiveHello() {
uint64_t packet_type = 0;
- if (!input_.ReadVarint64(&packet_type)) {
+ if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
return false;
}
if (packet_type == ServerCodes::Hello) {
- if (!WireFormat::ReadString(&input_, &server_info_.name)) {
+ if (!WireFormat::ReadString(*input_, &server_info_.name)) {
return false;
}
- if (!WireFormat::ReadUInt64(&input_, &server_info_.version_major)) {
+ if (!WireFormat::ReadUInt64(*input_, &server_info_.version_major)) {
return false;
}
- if (!WireFormat::ReadUInt64(&input_, &server_info_.version_minor)) {
+ if (!WireFormat::ReadUInt64(*input_, &server_info_.version_minor)) {
return false;
}
- if (!WireFormat::ReadUInt64(&input_, &server_info_.revision)) {
+ if (!WireFormat::ReadUInt64(*input_, &server_info_.revision)) {
return false;
}
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) {
- if (!WireFormat::ReadString(&input_, &server_info_.timezone)) {
+ if (!WireFormat::ReadString(*input_, &server_info_.timezone)) {
+ return false;
+ }
+ }
+
+ if (server_info_.revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) {
+ if (!WireFormat::ReadString(*input_, &server_info_.display_name)) {
+ return false;
+ }
+ }
+
+ if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) {
+ if (!WireFormat::ReadUInt64(*input_, &server_info_.version_patch)) {
return false;
}
}
@@ -738,7 +748,7 @@ bool Client::Impl::ReceiveHello() {
}
void Client::Impl::RetryGuard(std::function func) {
- for (int i = 0; i <= options_.send_retries; ++i) {
+ for (unsigned int i = 0; ; ++i) {
try {
func();
return;
@@ -746,13 +756,13 @@ void Client::Impl::RetryGuard(std::function func) {
bool ok = true;
try {
- std::this_thread::sleep_for(options_.retry_timeout);
+ socket_factory_->sleepFor(options_.retry_timeout);
ResetConnection();
} catch (...) {
ok = false;
}
- if (!ok) {
+ if (!ok && i == options_.send_retries) {
throw;
}
}
@@ -765,6 +775,13 @@ Client::Client(const ClientOptions& opts)
{
}
+Client::Client(const ClientOptions& opts,
+ std::unique_ptr socket_factory)
+ : options_(opts)
+ , impl_(new Impl(opts, std::move(socket_factory)))
+{
+}
+
Client::~Client()
{ }
@@ -773,11 +790,19 @@ void Client::Execute(const Query& query) {
}
void Client::Select(const std::string& query, SelectCallback cb) {
- Execute(Query(query).OnData(cb));
+ Execute(Query(query).OnData(std::move(cb)));
+}
+
+void Client::Select(const std::string& query, const std::string& query_id, SelectCallback cb) {
+ Execute(Query(query, query_id).OnData(std::move(cb)));
}
void Client::SelectCancelable(const std::string& query, SelectCancelableCallback cb) {
- Execute(Query(query).OnDataCancelable(cb));
+ Execute(Query(query).OnDataCancelable(std::move(cb)));
+}
+
+void Client::SelectCancelable(const std::string& query, const std::string& query_id, SelectCancelableCallback cb) {
+ Execute(Query(query, query_id).OnDataCancelable(std::move(cb)));
}
void Client::Select(const Query& query) {
@@ -785,7 +810,11 @@ void Client::Select(const Query& query) {
}
void Client::Insert(const std::string& table_name, const Block& block) {
- impl_->Insert(table_name, block);
+ impl_->Insert(table_name, Query::default_query_id, block);
+}
+
+void Client::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
+ impl_->Insert(table_name, query_id, block);
}
void Client::Ping() {
@@ -796,4 +825,8 @@ void Client::ResetConnection() {
impl_->ResetConnection();
}
+const ServerInfo& Client::GetServerInfo() const {
+ return impl_->GetServerInfo();
+}
+
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/client.h b/src/vendor/clickhouse-cpp/clickhouse/client.h
index 6953e7d..7f2b97d 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/client.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/client.h
@@ -9,6 +9,7 @@
#include "columns/enum.h"
#include "columns/ip4.h"
#include "columns/ip6.h"
+#include "columns/lowcardinality.h"
#include "columns/nullable.h"
#include "columns/numeric.h"
#include "columns/string.h"
@@ -19,9 +20,22 @@
#include
#include
#include
+#include
+
+typedef struct ssl_ctx_st SSL_CTX;
namespace clickhouse {
+struct ServerInfo {
+ std::string name;
+ std::string timezone;
+ std::string display_name;
+ uint64_t version_major;
+ uint64_t version_minor;
+ uint64_t version_patch;
+ uint64_t revision;
+};
+
/// Methods of block compression.
enum class CompressionMethod {
None = -1,
@@ -29,9 +43,9 @@ enum class CompressionMethod {
};
struct ClientOptions {
-#define DECLARE_FIELD(name, type, setter, default) \
- type name = default; \
- inline ClientOptions& setter(const type& value) { \
+#define DECLARE_FIELD(name, type, setter, default_value) \
+ type name = default_value; \
+ inline auto & setter(const type& value) { \
name = value; \
return *this; \
}
@@ -39,7 +53,7 @@ struct ClientOptions {
/// Hostname of the server.
DECLARE_FIELD(host, std::string, SetHost, std::string());
/// Service port.
- DECLARE_FIELD(port, int, SetPort, 9000);
+ DECLARE_FIELD(port, unsigned int, SetPort, 9000);
/// Default database.
DECLARE_FIELD(default_database, std::string, SetDefaultDatabase, "default");
@@ -56,7 +70,7 @@ struct ClientOptions {
/// Ping server every time before execute any query.
DECLARE_FIELD(ping_before_query, bool, SetPingBeforeQuery, false);
/// Count of retry to send request to server.
- DECLARE_FIELD(send_retries, int, SetSendRetries, 1);
+ DECLARE_FIELD(send_retries, unsigned int, SetSendRetries, 1);
/// Amount of time to wait before next retry.
DECLARE_FIELD(retry_timeout, std::chrono::seconds, SetRetryTimeout, std::chrono::seconds(5));
@@ -67,19 +81,126 @@ struct ClientOptions {
DECLARE_FIELD(tcp_keepalive, bool, TcpKeepAlive, false);
DECLARE_FIELD(tcp_keepalive_idle, std::chrono::seconds, SetTcpKeepAliveIdle, std::chrono::seconds(60));
DECLARE_FIELD(tcp_keepalive_intvl, std::chrono::seconds, SetTcpKeepAliveInterval, std::chrono::seconds(5));
- DECLARE_FIELD(tcp_keepalive_cnt, int, SetTcpKeepAliveCount, 3);
+ DECLARE_FIELD(tcp_keepalive_cnt, unsigned int, SetTcpKeepAliveCount, 3);
+
+ // TCP options
+ DECLARE_FIELD(tcp_nodelay, bool, TcpNoDelay, true);
+
+ /** It helps to ease migration of the old codebases, which can't afford to switch
+ * to using ColumnLowCardinalityT or ColumnLowCardinality directly,
+ * but still want to benefit from smaller on-wire LowCardinality bandwidth footprint.
+ *
+ * @see LowCardinalitySerializationAdaptor, CreateColumnByType
+ */
+ DECLARE_FIELD(backward_compatibility_lowcardinality_as_wrapped_column, bool, SetBakcwardCompatibilityFeatureLowCardinalityAsWrappedColumn, true);
+
+ /** Set max size data to compress if compression enabled.
+ *
+ * Allows choosing tradeoff betwen RAM\CPU:
+ * - Lower value reduces RAM usage, but slightly increases CPU usage.
+ * - Higher value increases RAM usage but slightly decreases CPU usage.
+ *
+ * Default is 0, use natural implementation-defined chunk size.
+ */
+ DECLARE_FIELD(max_compression_chunk_size, unsigned int, SetMaxCompressionChunkSize, 65535);
+
+ struct SSLOptions {
+ /** There are two ways to configure an SSL connection:
+ * - provide a pre-configured SSL_CTX, which is not modified and not owned by the Client.
+ * - provide a set of options and allow the Client to create and configure SSL_CTX by itself.
+ */
+
+ /** Pre-configured SSL-context for SSL-connection.
+ * If NOT null client DONES NOT take ownership of context and it must be valid for client lifetime.
+ * If null client initlaizes OpenSSL and creates his own context, initializes it using
+ * other options, like path_to_ca_files, path_to_ca_directory, use_default_ca_locations, etc.
+ *
+ * Either way context is used to create an SSL-connection, which is then configured with
+ * whatever was provided as `configuration`, `host_flags`, `skip_verification` and `use_sni`.
+ */
+ SSL_CTX * ssl_context = nullptr;
+ auto & SetExternalSSLContext(SSL_CTX * new_ssl_context) {
+ ssl_context = new_ssl_context;
+ return *this;
+ }
+
+ /** Means to validate the server-supplied certificate against trusted Certificate Authority (CA).
+ * If no CAs are configured, the server's identity can't be validated, and the Client would err.
+ * See https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_default_verify_paths.html
+ */
+ /// Load deafult CA certificates from deafult locations.
+ DECLARE_FIELD(use_default_ca_locations, bool, SetUseDefaultCALocations, true);
+ /// Path to the CA files to verify server certificate, may be empty.
+ DECLARE_FIELD(path_to_ca_files, std::vector, SetPathToCAFiles, {});
+ /// Path to the directory with CA files used to validate server certificate, may be empty.
+ DECLARE_FIELD(path_to_ca_directory, std::string, SetPathToCADirectory, "");
+
+ /** Min and max protocol versions to use, set with SSL_CTX_set_min_proto_version and SSL_CTX_set_max_proto_version
+ * for details see https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_min_proto_version.html
+ */
+ DECLARE_FIELD(min_protocol_version, int, SetMinProtocolVersion, DEFAULT_VALUE);
+ DECLARE_FIELD(max_protocol_version, int, SetMaxProtocolVersion, DEFAULT_VALUE);
+
+ /** Options to be set with SSL_CTX_set_options,
+ * for details see https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_options.html
+ */
+ DECLARE_FIELD(context_options, int, SetContextOptions, DEFAULT_VALUE);
+
+ /** Use SNI at ClientHello
+ */
+ DECLARE_FIELD(use_sni, bool, SetUseSNI, true);
+
+ /** Skip SSL session verification (server's certificate, etc).
+ *
+ * WARNING: settig to true will bypass all SSL session checks, which
+ * is dangerous, but can be used against self-signed certificates, e.g. for testing purposes.
+ */
+ DECLARE_FIELD(skip_verification, bool, SetSkipVerification, false);
+
+ /** Mode of verifying host ssl certificate against name of the host, set with SSL_set_hostflags.
+ * For details see https://www.openssl.org/docs/man1.1.1/man3/SSL_set_hostflags.html
+ */
+ DECLARE_FIELD(host_flags, int, SetHostVerifyFlags, DEFAULT_VALUE);
+
+ struct CommandAndValue {
+ std::string command;
+ std::optional value = std::nullopt;
+ };
+ /** Extra configuration options, set with SSL_CONF_cmd.
+ * For deatils see https://www.openssl.org/docs/man1.1.1/man3/SSL_CONF_cmd.html
+ *
+ * Takes multiple pairs of command-value strings, all commands are supported,
+ * and prefix is empty.
+ * i.e. pass `sigalgs` or `SignatureAlgorithms` instead of `-sigalgs`.
+ *
+ * Rewrites any other options/flags if set in other ways.
+ */
+ DECLARE_FIELD(configuration, std::vector, SetConfiguration, {});
+
+ static const int DEFAULT_VALUE = -1;
+ };
+
+ // By default SSL is turned off.
+ std::optional ssl_options = std::nullopt;
+
+ // Will throw an exception if client was built without SSL support.
+ ClientOptions& SetSSLOptions(SSLOptions options);
#undef DECLARE_FIELD
};
std::ostream& operator<<(std::ostream& os, const ClientOptions& options);
+class SocketFactory;
+
/**
*
*/
class Client {
public:
Client(const ClientOptions& opts);
+ Client(const ClientOptions& opts,
+ std::unique_ptr socket_factory);
~Client();
/// Intends for execute arbitrary queries.
@@ -88,16 +209,19 @@ class Client {
/// Intends for execute select queries. Data will be returned with
/// one or more call of \p cb.
void Select(const std::string& query, SelectCallback cb);
+ void Select(const std::string& query, const std::string& query_id, SelectCallback cb);
/// Executes a select query which can be canceled by returning false from
/// the data handler function \p cb.
void SelectCancelable(const std::string& query, SelectCancelableCallback cb);
+ void SelectCancelable(const std::string& query, const std::string& query_id, SelectCancelableCallback cb);
/// Alias for Execute.
void Select(const Query& query);
/// Intends for insert block of data into a table \p table_name.
void Insert(const std::string& table_name, const Block& block);
+ void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
/// Ping server for aliveness.
void Ping();
@@ -105,8 +229,10 @@ class Client {
/// Reset connection with initial params.
void ResetConnection();
+ const ServerInfo& GetServerInfo() const;
+
private:
- ClientOptions options_;
+ const ClientOptions options_;
class Impl;
std::unique_ptr impl_;
diff --git a/src/vendor/clickhouse-cpp/clickhouse/columns/array.cpp b/src/vendor/clickhouse-cpp/clickhouse/columns/array.cpp
index 7af64dd..c71684d 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/columns/array.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/columns/array.cpp
@@ -1,4 +1,5 @@
#include "array.h"
+#include "numeric.h"
#include
namespace clickhouse {
@@ -12,7 +13,7 @@ ColumnArray::ColumnArray(ColumnRef data)
void ColumnArray::AppendAsColumn(ColumnRef array) {
if (!data_->Type()->IsEqual(array->Type())) {
- throw std::runtime_error(
+ throw ValidationError(
"can't append column of type " + array->Type()->GetName() + " "
"to column type " + data_->Type()->GetName());
}
@@ -30,12 +31,11 @@ ColumnRef ColumnArray::GetAsColumn(size_t n) const {
return data_->Slice(GetOffset(n), GetSize(n));
}
-ColumnRef ColumnArray::Slice(size_t begin, size_t size) {
+ColumnRef ColumnArray::Slice(size_t begin, size_t size) const {
auto result = std::make_shared(GetAsColumn(begin));
result->OffsetsIncrease(1);
- for (size_t i = 1; i < size; i++)
- {
+ for (size_t i = 1; i < size; i++) {
result->Append(std::make_shared(GetAsColumn(begin + i)));
}
@@ -54,7 +54,10 @@ void ColumnArray::Append(ColumnRef column) {
}
}
-bool ColumnArray::Load(CodedInputStream* input, size_t rows) {
+bool ColumnArray::Load(InputStream* input, size_t rows) {
+ if (!rows) {
+ return true;
+ }
if (!offsets_->Load(input, rows)) {
return false;
}
@@ -64,7 +67,7 @@ bool ColumnArray::Load(CodedInputStream* input, size_t rows) {
return true;
}
-void ColumnArray::Save(CodedOutputStream* output) {
+void ColumnArray::Save(OutputStream* output) {
offsets_->Save(output);
data_->Save(output);
}
@@ -78,6 +81,12 @@ size_t ColumnArray::Size() const {
return offsets_->Size();
}
+void ColumnArray::Swap(Column& other) {
+ auto & col = dynamic_cast(other);
+ data_.swap(col.data_);
+ offsets_.swap(col.offsets_);
+}
+
void ColumnArray::OffsetsIncrease(size_t n) {
offsets_->Append(n);
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/columns/array.h b/src/vendor/clickhouse-cpp/clickhouse/columns/array.h
index 50ddcab..e96e70c 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/columns/array.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/columns/array.h
@@ -1,5 +1,6 @@
#pragma once
+#include "column.h"
#include "numeric.h"
namespace clickhouse {
@@ -24,10 +25,10 @@ class ColumnArray : public Column {
void Append(ColumnRef column) override;
/// Loads column data from input stream.
- bool Load(CodedInputStream* input, size_t rows) override;
+ bool Load(InputStream* input, size_t rows) override;
/// Saves column data to output stream.
- void Save(CodedOutputStream* output) override;
+ void Save(OutputStream* output) override;
/// Clear column data .
void Clear() override;
@@ -36,7 +37,9 @@ class ColumnArray : public Column {
size_t Size() const override;
/// Makes slice of the current column.
- ColumnRef Slice(size_t, size_t) override;
+ ColumnRef Slice(size_t, size_t) const override;
+
+ void Swap(Column&) override;
void OffsetsIncrease(size_t);
diff --git a/src/vendor/clickhouse-cpp/clickhouse/columns/column.h b/src/vendor/clickhouse-cpp/clickhouse/columns/column.h
index b0a9dee..6db5b39 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/columns/column.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/columns/column.h
@@ -1,11 +1,17 @@
#pragma once
-#include "../base/coded.h"
-#include "../base/input.h"
#include "../types/types.h"
+#include "../columns/itemview.h"
+#include "../exceptions.h"
+
+#include
+#include
namespace clickhouse {
+class InputStream;
+class OutputStream;
+
using ColumnRef = std::shared_ptr;
/**
@@ -31,15 +37,16 @@ class Column : public std::enable_shared_from_this {
/// Get type object of the column.
inline TypeRef Type() const { return type_; }
+ inline const class Type& GetType() const { return *type_; }
/// Appends content of given column to the end of current one.
virtual void Append(ColumnRef column) = 0;
/// Loads column data from input stream.
- virtual bool Load(CodedInputStream* input, size_t rows) = 0;
+ virtual bool Load(InputStream* input, size_t rows) = 0;
/// Saves column data to output stream.
- virtual void Save(CodedOutputStream* output) = 0;
+ virtual void Save(OutputStream* output) = 0;
/// Clear column data .
virtual void Clear() = 0;
@@ -48,7 +55,19 @@ class Column : public std::enable_shared_from_this {
virtual size_t Size() const = 0;
/// Makes slice of the current column.
- virtual ColumnRef Slice(size_t begin, size_t len) = 0;
+ virtual ColumnRef Slice(size_t begin, size_t len) const = 0;
+
+ virtual void Swap(Column&) = 0;
+
+ /// Get a view on raw item data if it is supported by column, will throw an exception if index is out of range.
+ /// Please note that view is invalidated once column items are added or deleted, column is loaded from strean or destroyed.
+ virtual ItemView GetItem(size_t) const {
+ throw UnimplementedError("GetItem() is not supported for column of " + type_->GetName());
+ }
+
+ friend void swap(Column& left, Column& right) {
+ left.Swap(right);
+ }
protected:
TypeRef type_;
diff --git a/src/vendor/clickhouse-cpp/clickhouse/columns/date.cpp b/src/vendor/clickhouse-cpp/clickhouse/columns/date.cpp
index a66a449..78a86be 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/columns/date.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/columns/date.cpp
@@ -27,11 +27,11 @@ void ColumnDate::Append(ColumnRef column) {
}
}
-bool ColumnDate::Load(CodedInputStream* input, size_t rows) {
+bool ColumnDate::Load(InputStream* input, size_t rows) {
return data_->Load(input, rows);
}
-void ColumnDate::Save(CodedOutputStream* output) {
+void ColumnDate::Save(OutputStream* output) {
data_->Save(output);
}
@@ -39,7 +39,7 @@ size_t ColumnDate::Size() const {
return data_->Size();
}
-ColumnRef ColumnDate::Slice(size_t begin, size_t len) {
+ColumnRef ColumnDate::Slice(size_t begin, size_t len) const {
auto col = data_->Slice(begin, len)->As();
auto result = std::make_shared();
@@ -48,6 +48,16 @@ ColumnRef ColumnDate::Slice(size_t begin, size_t len) {
return result;
}
+void ColumnDate::Swap(Column& other) {
+ auto & col = dynamic_cast(other);
+ data_.swap(col.data_);
+}
+
+ItemView ColumnDate::GetItem(size_t index) const {
+ return data_->GetItem(index);
+}
+
+
ColumnDateTime::ColumnDateTime()
: Column(Type::CreateDateTime())
@@ -55,6 +65,12 @@ ColumnDateTime::ColumnDateTime()
{
}
+ColumnDateTime::ColumnDateTime(std::string timezone)
+ : Column(Type::CreateDateTime(std::move(timezone)))
+ , data_(std::make_shared())
+{
+}
+
void ColumnDateTime::Append(const std::time_t& value) {
data_->Append(static_cast(value));
}
@@ -63,17 +79,21 @@ std::time_t ColumnDateTime::At(size_t n) const {
return data_->At(n);
}
+std::string ColumnDateTime::Timezone() const {
+ return type_->As()->Timezone();
+}
+
void ColumnDateTime::Append(ColumnRef column) {
if (auto col = column->As()) {
data_->Append(col->data_);
}
}
-bool ColumnDateTime::Load(CodedInputStream* input, size_t rows) {
+bool ColumnDateTime::Load(InputStream* input, size_t rows) {
return data_->Load(input, rows);
}
-void ColumnDateTime::Save(CodedOutputStream* output) {
+void ColumnDateTime::Save(OutputStream* output) {
data_->Save(output);
}
@@ -85,7 +105,7 @@ void ColumnDateTime::Clear() {
data_->Clear();
}
-ColumnRef ColumnDateTime::Slice(size_t begin, size_t len) {
+ColumnRef ColumnDateTime::Slice(size_t begin, size_t len) const {
auto col = data_->Slice(begin, len)->As();
auto result = std::make_shared();
@@ -94,4 +114,91 @@ ColumnRef ColumnDateTime::Slice(size_t begin, size_t len) {
return result;
}
+void ColumnDateTime::Swap(Column& other) {
+ auto & col = dynamic_cast(other);
+ data_.swap(col.data_);
+}
+
+ItemView ColumnDateTime::GetItem(size_t index) const {
+ return data_->GetItem(index);
+}
+
+ColumnDateTime64::ColumnDateTime64(size_t precision)
+ : ColumnDateTime64(Type::CreateDateTime64(precision), std::make_shared(18ul, precision))
+{}
+
+ColumnDateTime64::ColumnDateTime64(size_t precision, std::string timezone)
+ : ColumnDateTime64(Type::CreateDateTime64(precision, std::move(timezone)), std::make_shared(18ul, precision))
+{}
+
+ColumnDateTime64::ColumnDateTime64(TypeRef type, std::shared_ptr data)
+ : Column(type),
+ data_(data),
+ precision_(type->As()->GetPrecision())
+{}
+
+void ColumnDateTime64::Append(const Int64& value) {
+ // TODO: we need a type, which safely represents datetime.
+ // The precision of Poco.DateTime is not big enough.
+ data_->Append(value);
+}
+
+//void ColumnDateTime64::Append(const std::string& value) {
+// data_->Append(value);
+//}
+
+Int64 ColumnDateTime64::At(size_t n) const {
+ // make sure to use Absl's Int128 conversion
+ return static_cast(data_->At(n));
+}
+
+std::string ColumnDateTime64::Timezone() const {
+ return type_->As()->Timezone();
+}
+
+void ColumnDateTime64::Append(ColumnRef column) {
+ if (auto col = column->As()) {
+ data_->Append(col->data_);
+ }
+}
+
+bool ColumnDateTime64::Load(InputStream* input, size_t rows) {
+ return data_->Load(input, rows);
+}
+
+void ColumnDateTime64::Save(OutputStream* output) {
+ data_->Save(output);
+}
+
+void ColumnDateTime64::Clear() {
+ data_->Clear();
+}
+size_t ColumnDateTime64::Size() const {
+ return data_->Size();
+}
+
+ItemView ColumnDateTime64::GetItem(size_t index) const {
+ return data_->GetItem(index);
+}
+
+void ColumnDateTime64::Swap(Column& other) {
+ auto& col = dynamic_cast(other);
+ if (col.GetPrecision() != GetPrecision()) {
+ throw ValidationError("Can't swap DateTime64 columns when precisions are not the same: "
+ + std::to_string(GetPrecision()) + "(this) != " + std::to_string(col.GetPrecision()) + "(that)");
+ }
+
+ data_.swap(col.data_);
+}
+
+ColumnRef ColumnDateTime64::Slice(size_t begin, size_t len) const {
+ auto sliced_data = data_->Slice(begin, len)->As();
+
+ return ColumnRef{new ColumnDateTime64(type_, sliced_data)};
+}
+
+size_t ColumnDateTime64::GetPrecision() const {
+ return precision_;
+}
+
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/columns/date.h b/src/vendor/clickhouse-cpp/clickhouse/columns/date.h
index 92a62d7..62ca4e0 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/columns/date.h
+++ b/src/vendor/clickhouse-cpp/clickhouse/columns/date.h
@@ -1,5 +1,6 @@
#pragma once
+#include "decimal.h"
#include "numeric.h"
#include
@@ -9,6 +10,8 @@ namespace clickhouse {
/** */
class ColumnDate : public Column {
public:
+ using ValueType = std::time_t;
+
ColumnDate();
/// Appends one element to the end of column.
@@ -23,19 +26,23 @@ class ColumnDate : public Column {
void Append(ColumnRef column) override;
/// Loads column data from input stream.
- bool Load(CodedInputStream* input, size_t rows) override;
+ bool Load(InputStream* input, size_t rows) override;
/// Saves column data to output stream.
- void Save(CodedOutputStream* output) override;
+ void Save(OutputStream* output) override;
/// Clear column data .
void Clear() override;
-
+
/// Returns count of rows in the column.
size_t Size() const override;
/// Makes slice of the current column.
- ColumnRef Slice(size_t begin, size_t len) override;
+ ColumnRef Slice(size_t begin, size_t len) const override;
+
+ void Swap(Column& other) override;
+
+ ItemView GetItem(size_t index) const override;
private:
std::shared_ptr data_;
@@ -44,7 +51,10 @@ class ColumnDate : public Column {
/** */
class ColumnDateTime : public Column {
public:
+ using ValueType = std::time_t;
+
ColumnDateTime();
+ explicit ColumnDateTime(std::string timezone);
/// Appends one element to the end of column.
void Append(const std::time_t& value);
@@ -52,26 +62,88 @@ class ColumnDateTime : public Column {
/// Returns element at given row number.
std::time_t At(size_t n) const;
+ /// Timezone associated with a data column.
+ std::string Timezone() const;
+
+public:
/// Appends content of given column to the end of current one.
void Append(ColumnRef column) override;
/// Loads column data from input stream.
- bool Load(CodedInputStream* input, size_t rows) override;
+ bool Load(InputStream* input, size_t rows) override;
/// Clear column data .
void Clear() override;
/// Saves column data to output stream.
- void Save(CodedOutputStream* output) override;
+ void Save(OutputStream* output) override;
/// Returns count of rows in the column.
size_t Size() const override;
/// Makes slice of the current column.
- ColumnRef Slice(size_t begin, size_t len) override;
+ ColumnRef Slice(size_t begin, size_t len) const override;
+
+ void Swap(Column& other) override;
+
+ ItemView GetItem(size_t index) const override;
private:
std::shared_ptr data_;
};
+
+/** */
+class ColumnDateTime64 : public Column {
+public:
+ using ValueType = Int64;
+
+ explicit ColumnDateTime64(size_t precision);
+ ColumnDateTime64(size_t precision, std::string timezone);
+
+ /// Appends one element to the end of column.
+ void Append(const Int64& value);
+ // It is a bit controversal: users might expect it to parse string of ISO8601 or some other human-friendly format,
+ // but current implemntation parses it as fractional integer with decimal point, e.g. "123.456".
+// void Append(const std::string& value);
+
+ /// Returns element at given row number.
+ Int64 At(size_t n) const;
+
+ /// Timezone associated with a data column.
+ std::string Timezone() const;
+
+public:
+ /// Appends content of given column to the end of current one.
+ void Append(ColumnRef column) override;
+
+ /// Loads column data from input stream.
+ bool Load(InputStream* input, size_t rows) override;
+
+ /// Clear column data .
+ void Clear() override;
+
+ /// Saves column data to output stream.
+ void Save(OutputStream* output) override;
+
+ /// Returns count of rows in the column.
+ size_t Size() const override;
+
+ /// Makes slice of the current column.
+ ColumnRef Slice(size_t begin, size_t len) const override;
+
+ void Swap(Column& other) override;
+
+ ItemView GetItem(size_t index) const override;
+
+ size_t GetPrecision() const;
+
+private:
+ ColumnDateTime64(TypeRef type, std::shared_ptr data);
+
+private:
+ std::shared_ptr data_;
+ const size_t precision_;
+};
+
}
diff --git a/src/vendor/clickhouse-cpp/clickhouse/columns/decimal.cpp b/src/vendor/clickhouse-cpp/clickhouse/columns/decimal.cpp
index 576eed0..8a4186a 100644
--- a/src/vendor/clickhouse-cpp/clickhouse/columns/decimal.cpp
+++ b/src/vendor/clickhouse-cpp/clickhouse/columns/decimal.cpp
@@ -1,6 +1,101 @@
#include "decimal.h"
-#include
+namespace
+{
+using namespace clickhouse;
+
+#ifdef ABSL_HAVE_INTRINSIC_INT128
+template
+inline bool addOverflow(const Int128 & l, const T & r, Int128 * result)
+{
+ __int128 res;
+ const auto ret_value = __builtin_add_overflow(static_cast<__int128>(l), static_cast<__int128>(r), &res);
+
+ *result = res;
+ return ret_value;
+}
+
+template
+inline bool mulOverflow(const Int128 & l, const T & r, Int128 * result)
+{
+ __int128 res;
+ const auto ret_value = __builtin_mul_overflow(static_cast<__int128>(l), static_cast<__int128>(r), &res);
+
+ *result = res;
+ return ret_value;
+}
+
+#else
+template
+inline bool getSignBit(const T & v)
+{
+ return v < static_cast(0);
+}
+
+inline bool getSignBit(const Int128 & v)
+{
+// static constexpr Int128 zero {};
+// return v < zero;
+
+ // Sign of the whole absl::int128 value is determined by sign of higher 64 bits.
+ return absl::Int128High64(v) < 0;
+}
+
+inline bool addOverflow(const Int128 & l, const Int128 & r, Int128 * result)
+{
+ // *result = l + r;
+ // const auto result_sign = getSignBit(*result);
+ // return l_sign == r_sign && l_sign != result_sign;
+
+ // Based on code from:
+ // https://wiki.sei.cmu.edu/confluence/display/c/INT32-C.+Ensure+that+operations+on+signed+integers+do+not+result+in+overflow#INT32C.Ensurethatoperationsonsignedintegersdonotresultinoverflow-CompliantSolution
+ const auto r_positive = !getSignBit(r);
+
+ if ((r_positive && (l > (std::numeric_limits::max() - r))) ||
+ (!r_positive && (l < (std::numeric_limits::min() - r)))) {
+ return true;
+ }
+ *result = l + r;
+
+ return false;
+}
+
+template
+inline bool mulOverflow(const Int128 & l, const T & r, Int128 * result)
+{
+ // Based on code from:
+ // https://wiki.sei.cmu.edu/confluence/display/c/INT32-C.+Ensure+that+operations+on+signed+integers+do+not+result+in+overflow#INT32C.Ensurethatoperationsonsignedintegersdonotresultinoverflow-CompliantSolution.3
+ const auto l_positive = !getSignBit(l);
+ const auto r_positive = !getSignBit(r);
+
+ if (l_positive) {
+ if (r_positive) {
+ if (r != 0 && l > (std::numeric_limits::max() / r)) {
+ return true;
+ }
+ } else {
+ if (l != 0 && r < (std::numeric_limits::min() / l)) {
+ return true;
+ }
+ }
+ } else {
+ if (r_positive) {
+ if (r != 0 && l < (std::numeric_limits::min() / r)) {
+ return true;
+ }
+ } else {
+ if (l != 0 && (r < (std::numeric_limits::max() / l))) {
+ return true;
+ }
+ }
+ }
+
+ *result = l * r;
+ return false;
+}
+#endif
+
+}
namespace clickhouse {
@@ -16,32 +111,30 @@ ColumnDecimal::ColumnDecimal(size_t precision, size_t scale)
}
}
-ColumnDecimal::ColumnDecimal(TypeRef type)
- : Column(type)
+ColumnDecimal::ColumnDecimal(TypeRef type, ColumnRef data)
+ : Column(type),
+ data_(data)
{
}
-void ColumnDecimal::Append(const BigInt& value) {
+void ColumnDecimal::Append(const Int128& value) {
if (data_->Type()->GetCode() == Type::Int32) {
- //data_->As()->Append(static_cast(value));
- static_cast>(data_->As())->Append(static_cast(value.to_long()));
+ data_->As()->Append(static_cast(value));
} else if (data_->Type()->GetCode() == Type::Int64) {
- //data_->As()->Append(static_cast(value));
- static_cast>(data_->As())->Append(static_cast(value.to_long_long()));
+ data_->As()->Append(static_cast(value));
} else {
- //data_->As()->Append(static_cast(value));
- static_cast>(data_->As())->Append(static_cast(value));
+ data_->As()->Append(static_cast(value));
}
}
void ColumnDecimal::Append(const std::string& value) {
- BigInt int_value = 0;
+ Int128 int_value = 0;
auto c = value.begin();
auto end = value.end();
bool sign = true;
bool has_dot = false;
- int zeros = 0;
+ size_t zeros = 0;
while (c != end) {
if (*c == '-') {
@@ -51,7 +144,7 @@ void ColumnDecimal::Append(const std::string& value) {
}
} else if (*c == '.' && !has_dot) {
size_t distance = std::distance(c, end) - 1;
- auto scale = std::static_pointer_cast(type_)->GetScale();
+ auto scale = type_->As()->GetScale();
if (distance <= scale) {
zeros = scale - distance;
@@ -61,33 +154,40 @@ void ColumnDecimal::Append(const std::string& value) {
has_dot = true;
} else if (*c >= '0' && *c <= '9') {
- int_value *= 10;
- int_value += *c - '0';
+ if (mulOverflow(int_value, 10, &int_value) ||
+ addOverflow(int_value, *c - '0', &int_value)) {
+ throw AssertionError("value is too big for 128-bit integer");
+ }
} else {
- throw std::runtime_error(std::string("unexpected symbol '") + (*c) + "' in decimal value");
+ throw ValidationError(std::string("unexpected symbol '") + (*c) + "' in decimal value");
}
++c;
}
if (c != end) {
- throw std::runtime_error("unexpected symbol '-' in decimal value");
+ throw ValidationError("unexpected symbol '-' in decimal value");
}
while (zeros) {
- int_value *= 10;
+ if (mulOverflow(int_value, 10, &int_value)) {
+ throw AssertionError("value is too big for 128-bit integer");
+ }
--zeros;
}
Append(sign ? int_value : -int_value);
}
-BigInt ColumnDecimal::At(size_t i) const {
- if (data_->Type()->GetCode() == Type::Int32) {
- return static_cast(data_->As()->At(i));
- } else if (data_->Type()->GetCode() == Type::Int64) {
- return static_cast(data_->As()->At(i));
- } else {
- return data_->As()->At(i);
+Int128 ColumnDecimal::At(size_t i) const {
+ switch (data_->Type()->GetCode()) {
+ case Type::Int32:
+ return static_cast(data_->As