Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify query mechanism #127

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions image_generation_subnet/protocol.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import time
import asyncio
import copy
import json
import time
import traceback
import typing

import bittensor as bt
from bittensor_wallet import Keypair
import pydantic
from generation_models.utils import base64_to_pil_image
import typing
import yaml
import requests
import traceback
import copy
import yaml
from bittensor_wallet import Keypair

from generation_models.utils import base64_to_pil_image

MODEL_CONFIG = yaml.load(
open("generation_models/configs/model_config.yaml"), yaml.FullLoader
Expand Down
1 change: 1 addition & 0 deletions image_generation_subnet/validator/miner_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def get_miner_info(self, only_layer_one=False):
else:
uids = [int(uid) for uid in self.validator.metagraph.uids]
query_axons = [self.validator.metagraph.axons[uid] for uid in uids]

synapse = Information()
bt.logging.info("Requesting miner info using synapse Information")
responses = self.validator.dendrite.query(
Expand Down
94 changes: 66 additions & 28 deletions neurons/validator/validator.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import time, asyncio
import bittensor as bt
import random
import torch
import asyncio
import json
import math
import pickle
import queue
import random
import threading
import time
import traceback
from datetime import datetime

import bittensor as bt
import numpy as np
import torch
import yaml

import image_generation_subnet as ig_subnet
from generation_models.utils import random_image_size
from image_generation_subnet.base.validator import BaseValidatorNeuron
from neurons.validator.validator_proxy import ValidatorProxy
from image_generation_subnet.validator import MinerManager
import image_generation_subnet as ig_subnet
import traceback
import yaml
import threading
import math
import queue
import json
from image_generation_subnet.validator.offline_challenge import (
get_backup_image,
get_backup_prompt,
get_backup_llm_prompt,
get_backup_challenge_vqa,
)
from datetime import datetime
get_backup_challenge_vqa, get_backup_image, get_backup_llm_prompt,
get_backup_prompt)
from neurons.validator.validator_proxy import ValidatorProxy
from services.offline_rewarding.redis_client import RedisClient
from services.offline_rewarding.reward_app import RewardApp
from generation_models.utils import random_image_size

MODEL_CONFIGS = yaml.load(
open("generation_models/configs/model_config.yaml"), yaml.FullLoader
Expand Down Expand Up @@ -616,12 +616,55 @@ def async_query_and_reward(
synapses, batched_uids_should_rewards = self.prepare_challenge(
uids_should_rewards, model_name, pipeline_type
)
for synapse, uids_should_rewards in zip(synapses, batched_uids_should_rewards):

def query_axons(axons, synapses):
"""
Query asynchronously multiple axons with multiple synapses
"""
responses = [None] * len(axons)
try:
loop = asyncio.get_event_loop()
responses = loop.run_until_complete(asyncio.gather(
*(dendrite.call(
axons,
synapse,
deserialize=False,
timeout=self.nicheimage_catalogue[model_name]["timeout"]
) for (axons, synapse) in zip(axons, synapses))
))
except Exception:
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
responses = new_loop.run_until_complete(asyncio.gather(
*(dendrite.call(
axons,
synapse,
deserialize=False,
timeout=self.nicheimage_catalogue[model_name]["timeout"]
) for (axons, synapse) in zip(axons, synapses))
))
new_loop.close()
finally:
dendrite.close_session()
return responses

axons = []
for uid in uids:
if uid in self.miner_manager.layer_one_axons:
axons.append(self.miner_manager.layer_one_axons[uid])
else:
axons.append(self.metagraph.axons[uid])

batch_size = min(4, 1 + len(axons) // 4)
_responses = []
for i in range(0, len(synapses), batch_size):
_responses.extend(query_axons(axons[i:i+batch_size], synapses[i:i+batch_size]))

for synapse, uids_should_rewards, response in zip(synapses, batched_uids_should_rewards, _responses):
uids, should_rewards = zip(*uids_should_rewards)
bt.logging.info(f"Quering {uids}, Should reward: {should_rewards}")
if not synapse:
continue
# base_synapse = synapse.copy()
base_synapse = synapse.model_copy()
if (
self.offline_reward
Expand All @@ -638,12 +681,7 @@ def async_query_and_reward(
else:
axons.append(self.metagraph.axons[uid])

responses = dendrite.query(
axons=axons,
synapse=synapse,
deserialize=False,
timeout=self.nicheimage_catalogue[model_name]["timeout"],
)
responses = [response]
reward_responses = [
response
for response, should_reward in zip(responses, should_rewards)
Expand Down Expand Up @@ -696,7 +734,7 @@ def async_query_and_reward(
self.miner_manager,
)

# Scale Reward based on Miner Volume
# Scale Reward based on Miner Volume
for i, uid in enumerate(reward_uids):
if rewards[i] > 0:
rewards[i] = rewards[i] * (
Expand Down