Skip to content

8352392: AIX: implement attach API v2 and streaming output #24177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 53 additions & 182 deletions src/hotspot/os/aix/attachListener_aix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,7 @@ class AixAttachListener: AllStatic {

static bool _atexit_registered;

// reads a request from the given connected socket
static AixAttachOperation* read_request(int s);

public:
enum {
ATTACH_PROTOCOL_VER = 1 // protocol version
};
enum {
ATTACH_ERROR_BADVERSION = 101 // error codes
};

static void set_path(char* path) {
if (path == nullptr) {
Expand All @@ -107,25 +98,65 @@ class AixAttachListener: AllStatic {
static void set_shutdown(bool shutdown) { _shutdown = shutdown; }
static bool is_shutdown() { return _shutdown; }

// write the given buffer to a socket
static int write_fully(int s, char* buf, size_t len);

static AixAttachOperation* dequeue();
};

class SocketChannel : public AttachOperation::RequestReader, public AttachOperation::ReplyWriter {
private:
int _socket;
public:
SocketChannel(int socket) : _socket(socket) {}
~SocketChannel() {
close();
}

bool opened() const {
return _socket != -1;
}

void close() {
if (opened()) {
// SHUT_RDWR is not available
::shutdown(_socket, 2);
::close(_socket);
_socket = -1;
}
}

// RequestReader
int read(void* buffer, int size) override {
ssize_t n;
RESTARTABLE(::read(_socket, buffer, (size_t)size), n);
return checked_cast<int>(n);
}

// ReplyWriter
int write(const void* buffer, int size) override {
ssize_t n;
RESTARTABLE(::write(_socket, buffer, size), n);
return checked_cast<int>(n);
}

void flush() override {
}
};

class AixAttachOperation: public AttachOperation {
private:
// the connection to the client
int _socket;
SocketChannel _socket_channel;

public:
void complete(jint res, bufferedStream* st);
AixAttachOperation(int socket) : AttachOperation(), _socket_channel(socket) {}

void set_socket(int s) { _socket = s; }
int socket() const { return _socket; }
void complete(jint res, bufferedStream* st) override;

AixAttachOperation(char* name) : AttachOperation(name) {
set_socket(-1);
ReplyWriter* get_reply_writer() override {
return &_socket_channel;
}

bool read_request() {
return _socket_channel.read_request(this, &_socket_channel);
}
};

Expand All @@ -137,34 +168,6 @@ bool AixAttachListener::_atexit_registered = false;
// Shutdown marker to prevent accept blocking during clean-up
volatile bool AixAttachListener::_shutdown = false;

// Supporting class to help split a buffer into individual components
class ArgumentIterator : public StackObj {
private:
char* _pos;
char* _end;
public:
ArgumentIterator(char* arg_buffer, size_t arg_size) {
_pos = arg_buffer;
_end = _pos + arg_size - 1;
}
char* next() {
if (*_pos == '\0') {
// advance the iterator if possible (null arguments)
if (_pos < _end) {
_pos += 1;
}
return nullptr;
}
char* res = _pos;
char* next_pos = strchr(_pos, '\0');
if (next_pos < _end) {
next_pos++;
}
_pos = next_pos;
return res;
}
};

// On AIX if sockets block until all data has been transmitted
// successfully in some communication domains a socket "close" may
// never complete. We have to take care that after the socket shutdown
Expand Down Expand Up @@ -258,106 +261,6 @@ int AixAttachListener::init() {
return 0;
}

// Given a socket that is connected to a peer we read the request and
// create an AttachOperation. As the socket is blocking there is potential
// for a denial-of-service if the peer does not response. However this happens
// after the peer credentials have been checked and in the worst case it just
// means that the attach listener thread is blocked.
//
AixAttachOperation* AixAttachListener::read_request(int s) {
char ver_str[8];
os::snprintf_checked(ver_str, sizeof(ver_str), "%d", ATTACH_PROTOCOL_VER);

// The request is a sequence of strings so we first figure out the
// expected count and the maximum possible length of the request.
// The request is:
// <ver>0<cmd>0<arg>0<arg>0<arg>0
// where <ver> is the protocol version (1), <cmd> is the command
// name ("load", "datadump", ...), and <arg> is an argument
int expected_str_count = 2 + AttachOperation::arg_count_max;
const size_t max_len = (sizeof(ver_str) + 1) + (AttachOperation::name_length_max + 1) +
AttachOperation::arg_count_max*(AttachOperation::arg_length_max + 1);

char buf[max_len];
int str_count = 0;

// Read until all (expected) strings have been read, the buffer is
// full, or EOF.

size_t off = 0;
size_t left = max_len;

do {
ssize_t n;
// Don't block on interrupts because this will
// hang in the clean-up when shutting down.
n = read(s, buf+off, left);
assert(n <= checked_cast<ssize_t>(left), "buffer was too small, impossible!");
buf[max_len - 1] = '\0';
if (n == -1) {
return nullptr; // reset by peer or other error
}
if (n == 0) {
break;
}
for (int i=0; i<n; i++) {
if (buf[off+i] == 0) {
// EOS found
str_count++;

// The first string is <ver> so check it now to
// check for protocol mismatch
if (str_count == 1) {
if ((strlen(buf) != strlen(ver_str)) ||
(atoi(buf) != ATTACH_PROTOCOL_VER)) {
char msg[32];
os::snprintf_checked(msg, sizeof(msg), "%d\n", ATTACH_ERROR_BADVERSION);
write_fully(s, msg, strlen(msg));
return nullptr;
}
}
}
}
off += n;
left -= n;
} while (left > 0 && str_count < expected_str_count);

if (str_count != expected_str_count) {
return nullptr; // incomplete request
}

// parse request

ArgumentIterator args(buf, (max_len)-left);

// version already checked
char* v = args.next();

char* name = args.next();
if (name == nullptr || strlen(name) > AttachOperation::name_length_max) {
return nullptr;
}

AixAttachOperation* op = new AixAttachOperation(name);

for (int i=0; i<AttachOperation::arg_count_max; i++) {
char* arg = args.next();
if (arg == nullptr) {
op->set_arg(i, nullptr);
} else {
if (strlen(arg) > AttachOperation::arg_length_max) {
delete op;
return nullptr;
}
op->set_arg(i, arg);
}
}

op->set_socket(s);
return op;
}


// Dequeue an operation
//
// In the Aix implementation there is only a single operation and clients
Expand Down Expand Up @@ -402,31 +305,16 @@ AixAttachOperation* AixAttachListener::dequeue() {
}

// peer credential look okay so we read the request
AixAttachOperation* op = read_request(s);
if (op == nullptr) {
::close(s);
AixAttachOperation* op = new AixAttachOperation(s);
if (!op->read_request()) {
delete op;
continue;
} else {
return op;
}
}
}

// write the given buffer to the socket
int AixAttachListener::write_fully(int s, char* buf, size_t len) {
do {
ssize_t n = ::write(s, buf, len);
if (n == -1) {
if (errno != EINTR) return -1;
} else {
buf += n;
len -= n;
}
}
while (len > 0);
return 0;
}

// Complete an operation by sending the operation result and any result
// output to the client. At this time the socket is in blocking mode so
// potentially we can block if there is a lot of data and the client is
Expand All @@ -436,24 +324,6 @@ int AixAttachListener::write_fully(int s, char* buf, size_t len) {
// socket could be made non-blocking and a timeout could be used.

void AixAttachOperation::complete(jint result, bufferedStream* st) {
JavaThread* thread = JavaThread::current();
ThreadBlockInVM tbivm(thread);

// write operation result
char msg[32];
os::snprintf_checked(msg, sizeof(msg), "%d\n", result);
int rc = AixAttachListener::write_fully(this->socket(), msg, strlen(msg));

// write any result data
if (rc == 0) {
// Shutdown the socket in the cleanup function to enable more than
// one agent attach in a sequence (see comments to listener_cleanup()).
AixAttachListener::write_fully(this->socket(), (char*) st->base(), st->size());
}

// done
::close(this->socket());

delete this;
}

Expand Down Expand Up @@ -493,6 +363,7 @@ void AttachListener::vm_start() {
}

int AttachListener::pd_init() {
AttachListener::set_supported_version(ATTACH_API_V2);
JavaThread* thread = JavaThread::current();
ThreadBlockInVM tbivm(thread);

Expand Down
Loading