Update redis task and redis subscriber. (#1616)

This commit is contained in:
xiehan
2024-08-30 20:41:33 +08:00
committed by GitHub
parent fbca1d7c5d
commit 4c39519283
2 changed files with 14 additions and 17 deletions

View File

@@ -93,6 +93,11 @@ public:
return this->sync_send("PING", { });
}
int quit()
{
return this->sync_send("QUIT", { });
}
public:
/* All 'timeout' proxy functions can only be called only before
the task is started or in 'extract'. */

View File

@@ -72,6 +72,7 @@ bool ComplexRedisTask::check_request()
if (this->req.get_command(command) &&
(strcasecmp(command.c_str(), "AUTH") == 0 ||
strcasecmp(command.c_str(), "SELECT") == 0 ||
strcasecmp(command.c_str(), "RESET") == 0 ||
strcasecmp(command.c_str(), "ASKING") == 0))
{
this->state = WFT_STATE_TASK_ERROR;
@@ -368,28 +369,19 @@ ComplexRedisSubscribeTask::SubscribeWrapper::next_in(ProtocolMessage *message)
{
redis_reply_t *reply = ((RedisResponse *)message)->result_ptr();
if (reply->type == REDIS_REPLY_TYPE_ARRAY && reply->elements == 3 &&
reply->element[0]->type == REDIS_REPLY_TYPE_STRING)
{
const char *str = reply->element[0]->str;
size_t len = reply->element[0]->len;
if ((len == 11 && strncasecmp(str, "unsubscribe", 11)) == 0 ||
(len == 12 && strncasecmp(str, "punsubscribe", 12) == 0))
{
if (reply->element[2]->type == REDIS_REPLY_TYPE_INTEGER &&
reply->element[2]->integer == 0)
{
task_->finished_ = true;
}
}
}
else if (!task_->watching_)
if (reply->type != REDIS_REPLY_TYPE_ARRAY)
{
task_->finished_ = true;
return NULL;
}
if (reply->elements == 3 &&
reply->element[2]->type == REDIS_REPLY_TYPE_INTEGER &&
reply->element[2]->integer == 0)
{
task_->finished_ = true;
}
task_->watching_ = true;
task_->extract_(task_);