Skip to content
This repository was archived by the owner on Nov 23, 2023. It is now read-only.

WIP scripts to generate audio/midi realtime and offline from notochord checkpoint #15

Open
wants to merge 4 commits into
base: kr-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added examples/notochord/example.sf2
Binary file not shown.
93 changes: 93 additions & 0 deletions examples/notochord/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
## WIP!! offline generate midi and audiofile from notochord

# TODO: add pyfluidsynth to requirements

from notochord import Notochord

import fire
from typing import List
from mido import Message, MidiFile, MidiTrack, second2tick
import time


def save_midifile(nc_notes: List, filename: str='generated.midi') -> bool:
"""
:param nc_notes: list of note events returned from querying notochord
"""
mid = MidiFile(type=1)
inst_track = {}

for r in nc_notes:
print(r)
inst, pitch, nctime, velocity, end, step = r['instrument'], r['pitch'], r['time'], r['velocity'], r['end'], r['step']

# process notochord events for MIDI
program = inst - 1 # instruments in nc are shifted by 1 to allow for start token?
velocity = round(velocity) # discretize velocity
# can increase the time resolution by increasing MidiFile.ticks_per_beat- typical range is [96-480] but can go higher
midi_time = round(second2tick(nctime)) # TODO: specify ticks_per_beat, tempo

# TODO does fluid synth handle multi tracks?
track = inst_track.get(inst)
if track is None:
# add a new track for each new instrument
track = MidiTrack()
inst_track[inst] = track
mid.tracks.append(track)

# TODO: handle drum tracks - should be mapped to channel 10 in 1-128 or channel 9 in 0-127
# have to synchronize to start at same time or try async type=2? maybe program change does this for us

track.append(Message('program_change', program=inst, time=0))
note_on_off = 'note_on' if velocity else 'note_off'
track.append(Message(note_on_off, note=pitch, velocity=velocity, time=midi_time))

mid.save(filename)
return True


def main(checkpoint: str = None) -> None:

if checkpoint is None:
raise ValueError(f'Checkpoint required but is None. Use --checkpoint to provide the path to the .ckpt file.')

predictor = Notochord.from_checkpoint(checkpoint)
predictor.eval()

include_instrument = None
inst = 1 # grand piano?
pitch = 60 # C4
nctime = 0
velocity = 100
unique_inst = set()
notes = []

start = time.time()
# r = predictor.feed(inst, pitch, nctime, velocity)
while time.time() - start < 10:

# Only allow sampling up to 16 instruments- take the first 16 unique instruments
if len(unique_inst) >= 16:
include_instrument = list(unique_inst)
else:
unique_inst.add(inst)

# r = predictor.query()
r = predictor.feed_query(inst, pitch, nctime, velocity, include_instrument=include_instrument)
# print(r)
inst, pitch, nctime, velocity, end, step = r['instrument'], r['pitch'], r['time'], r['velocity'], r['end'], r['step']
notes.append(r)

end = time.time()
print(end - start)
print((end - start) / step)
print(unique_inst)
print(len(unique_inst))
print(include_instrument)
predictor.reset()

save_midifile(notes)


if __name__=='__main__':
fire.Fire(main)
108 changes: 108 additions & 0 deletions examples/notochord/generate_realtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
## WIP realtime audio generation from notochord checkpoint

from notochord import Notochord

import fire
import fluidsynth
from mido import second2tick
from multiprocessing import Process, Queue
import time


def sequence_event(event: dict) -> None:
"""
:param nc_notes: list of note events returned from querying notochord
"""
ticks_per_beat = 1000
tempo = 120

seq = fluidsynth.Sequencer(time_scale=ticks_per_beat)
fs = fluidsynth.Synth()
fs.start()
sfid = fs.sfload("example.sf2")
fs.program_select(0, sfid, 0, 0)
synthID = seq.register_fluidsynth(fs)

try:
while True:
if q.empty():
time.sleep(0.0005)
continue

r = q.get()
inst, pitch, nctime, velocity, end, step = r['instrument'], r['pitch'], r['time'], r['velocity'], r['end'], r['step']

# process notochord events for MIDI
# program = inst - 1 # instruments in nc are shifted by 1 to allow for start token
# TODO: convert inst to prog/ channel
# TODO: ignore start token 0

# measure fluidsynth delay - bonus points for latency to actual sound production
# measure notochord latency
# build in 100ms buffer like tidal?

# first- dumb version - time.sleep nctime
# second version- compensate for drift
# third version- ?

note_off = velocity == 0
velocity = round(velocity) # discretize velocity
# can increase the time resolution by increasing MidiFile.ticks_per_beat- typical range is [96-480] but can go higher
midi_time = round(second2tick(nctime, ticks_per_beat, tempo))

# TODO: handle drum tracks - should be mapped to channel 10 in 1-128 or channel 9 in 0-127
if note_off:
seq.note_off(time=midi_time, absolute=False, channel=0, key=pitch, velocity=velocity, dest=synthID)
else:
seq.note_on(time=midi_time, absolute=False, channel=0, key=pitch, velocity=velocity, dest=synthID)


except KeyboardInterrupt:
print('sequence_event done')
# finally:


def generate_from_notochord(q, checkpoint: str) -> None:
predictor = Notochord.from_checkpoint(checkpoint)
predictor.reset() # TODO: look at reset logic
predictor.eval()

unique_inst = set()
notes = []

# optional feed initial note
r = predictor.feed(inst=1, pitch=60, nctime=0, velocity=100, include_instrument=None)

try:
while True:
# Only allow sampling up to 16 instruments- take the first 16 unique instruments
if len(unique_inst) >= 16:
include_instrument = list(unique_inst)
else:
unique_inst.add(inst)
# use query feed - blacklist certain instruments, set temperature - absolute simplest way
start = time.time()
r = predictor.query_feed(include_instrument=include_instrument)
notochord_latency = time.time() - start
time.sleep(min(nctime - notochord_latency, 0)) # TODO: test logic for factoring in notochord_latency
q.put(r) # add event to queue to be played
except KeyboardInterrupt:
print('generate_from_notochord done')
finally:
predictor.reset()


def main(checkpoint: str = None) -> None:

if checkpoint is None:
raise ValueError(f'Checkpoint required but is None. Use --checkpoint to provide the path to the .ckpt file.')

p1 = Process(target=generate_from_notochord, args=(q, checkpoint,))
p2 = Process(target=sequence_event, args=(q,))
p1.start()
p2.start()


if __name__=='__main__':
q = Queue()
fire.Fire(main)