diff --git a/rebar.config b/rebar.config index 754ce85..ad30ca8 100644 --- a/rebar.config +++ b/rebar.config @@ -3,8 +3,7 @@ {deps, [ {jiffy, "0.14.11"}, - {erlexec, "1.7.1"}, - {b64fast, "0.2.1"} + {erlexec, "1.7.1"} ]}. @@ -30,20 +29,6 @@ ]} ]}. -{overrides, - [{override, b64fast, - [ - {plugins, [pc]}, - {artifacts, ["priv/b64fast.so"]}, - {so_name, "b64fast.so"}, - {provider_hooks, [ - {post, - [{compile, {pc, compile}}, - {clean, {pc, clean}}]} - ]} - ]} - ]}. - {dialyzer, [ {warnings, [unknown, no_return, error_handling]}, {plt_apps, top_level_deps}, diff --git a/rebar.lock b/rebar.lock index 49a106a..a0db0b9 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,10 +1,8 @@ {"1.1.0", -[{<<"b64fast">>,{pkg,<<"b64fast">>,<<"0.2.1">>},0}, - {<<"erlexec">>,{pkg,<<"erlexec">>,<<"1.7.1">>},0}, +[{<<"erlexec">>,{pkg,<<"erlexec">>,<<"1.7.1">>},0}, {<<"jiffy">>,{pkg,<<"jiffy">>,<<"0.14.11">>},0}]}. [ {pkg_hash,[ - {<<"b64fast">>, <<"EF67EFD73109BF52A83338A2DED30D0DC4EC1B11449B4C43DB970D3FC96CC9CD">>}, {<<"erlexec">>, <<"6DDBD40FA202084ED0BDAF95A50C334ACAA5644AE213B903CD4094A78AE79734">>}, {<<"jiffy">>, <<"919A87D491C5A6B5E3BBC27FAFEDC3A0761CA0B4C405394F121F582FD4E3F0E5">>}]} ]. diff --git a/src/erlmld.app.src b/src/erlmld.app.src index f10253a..d78c437 100644 --- a/src/erlmld.app.src +++ b/src/erlmld.app.src @@ -20,8 +20,7 @@ stdlib, crypto, erlexec, - jiffy, - b64fast]}, + jiffy]}, {modules, []}, {env, [ %% if nonzero, use a specific listen port instead of a random one: @@ -89,6 +88,9 @@ %% if true, don't fail if kcl encounters child shards with open parents, which %% can occur when processing dynamo streams on very large tables (requires %% patched KCL): - {ignore_unexpected_child_shards, false} + {ignore_unexpected_child_shards, false}, + + %% base64 decoder {mod, fun} + {base64_decoder, {base64, decode}} ]} ]}. diff --git a/src/erlmld_wrk_statem.erl b/src/erlmld_wrk_statem.erl index b28a6b7..d09af79 100644 --- a/src/erlmld_wrk_statem.erl +++ b/src/erlmld_wrk_statem.erl @@ -664,9 +664,9 @@ stream_record(Record, PartitionKey, Data, SequenceNumber) -> deaggregate_kpl_records(R, Records) -> + Base64Decoder = application:get_env(erlmld, base64_decoder, {base64, decode}), lists:flatmap(fun (#{<<"data">> := RecordData} = Record) -> - %% note: b64fast will fail if the data contains whitespace. - deaggregate_kpl_record(R, Record, b64fast:decode64(RecordData)) + deaggregate_kpl_record(R, Record, b64decode(Base64Decoder, RecordData)) end, Records). @@ -730,6 +730,10 @@ encode_seqno_base(X) when is_atom(X), X /= undefined -> atom_to_binary(X, utf8). +b64decode({M, F}, Data) -> + M:F(Data). + + %%%=================================================================== %%% TESTS %%%===================================================================