-
Notifications
You must be signed in to change notification settings - Fork 378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel Inferencing? #623
Comments
LLamaSharp intends to be threadsafe, but that's a bit tricky due to some thread safety issues in llama.cpp itself. At the moment it's set up so there's a global lock taken while inference is running - this means you can dispatch as much work as you like from as many threads as you like (but ultimately, only one will happen at a time). Hopefully in the near future we'll be able to remove this lock! |
Thanks Martin. Do you know if llama.cpp can support taking in multiple prompts with one call. It looks like llama.cpp web server can support it but not sure how stable the feature is. |
LLamaSharp has the BatchedExecutor which is an entirely new executor I've been working on. You can spawn multiple "Conversations" which can all be prompted and then inference runs for all of them simultaneously. |
Thanks Martin. I will try it out with the next release |
The |
Cool. I will test it out this week |
Hi, I've been wanting to try this out myself but I can't get the example to work. using LLama.Batched;
using LLama.Common;
using LLama.Native;
using LLama.Sampling;
using Spectre.Console;
namespace LLama.Examples.Examples;
/// <summary>
/// This demonstrates using a batch to generate two sequences and then using one
/// sequence as the negative guidance ("classifier free guidance") for the other.
/// </summary>
///
public class BatchedExecutorGuidance
{
private const int n_len = 32;
public static async Task Run()
{
string modelPath = @"/mnt/models/openchat-3.5-0106.Q8_0.gguf";
var parameters = new ModelParams(modelPath);
using var model = LLamaWeights.LoadFromFile(parameters);
var positivePrompt = AnsiConsole.Ask("Positive Prompt (or ENTER for default):", "My favourite colour is").Trim();
var negativePrompt = AnsiConsole.Ask("Negative Prompt (or ENTER for default):", "I hate the colour red. My favourite colour is").Trim();
var weight = AnsiConsole.Ask("Guidance Weight (or ENTER for default):", 2.0f);
// Create an executor that can evaluate a batch of conversations together
using var executor = new BatchedExecutor(model, parameters);
// Print some info
var name = executor.Model.Metadata.GetValueOrDefault("general.name", "unknown model name");
Console.WriteLine($"Created executor with model: {name}");
// Load the two prompts into two conversations
// using var guided = executor.Create();
// guided.Prompt(positivePrompt);
using var guided = executor.Prompt(positivePrompt);
// using var guidance = executor.Create();
// guidance.Prompt(negativePrompt);
using var guidance = executor.Prompt(negativePrompt);
// Run inference to evaluate prompts
await AnsiConsole
.Status()
.Spinner(Spinner.Known.Line)
.StartAsync("Evaluating Prompts...", _ => executor.Infer());
// Fork the "guided" conversation. We'll run this one without guidance for comparison
using var unguided = guided.Fork();
// Run inference loop
var unguidedSampler = new GuidedSampler(null, weight);
var unguidedDecoder = new StreamingTokenDecoder(executor.Context);
var guidedSampler = new GuidedSampler(guidance, weight);
var guidedDecoder = new StreamingTokenDecoder(executor.Context);
await AnsiConsole
.Progress()
.StartAsync(async progress =>
{
var reporter = progress.AddTask("Running Inference", maxValue: n_len);
for (var i = 0; i < n_len; i++)
{
if (i != 0)
await executor.Infer();
// Sample from the "unguided" conversation. This is just a conversation using the same prompt, without any
// guidance. This serves as a comparison to show the effect of guidance.
var u = unguidedSampler.Sample(executor.Context.NativeHandle, unguided.Sample().ToArray(), Array.Empty<LLamaToken>());
unguidedDecoder.Add(u);
unguided.Prompt(u);
// Sample from the "guided" conversation. This sampler will internally use the "guidance" conversation
// to steer the conversation. See how this is done in GuidedSampler.ProcessLogits (bottom of this file).
var g = guidedSampler.Sample(executor.Context.NativeHandle, guided.Sample().ToArray(), Array.Empty<LLamaToken>());
guidedDecoder.Add(g);
// Use this token to advance both guided _and_ guidance. Keeping them in sync (except for the initial prompt).
guided.Prompt(g);
guidance.Prompt(g);
// Early exit if we reach the natural end of the guided sentence
if (g == model.EndOfSentenceToken)
break;
// Update progress bar
reporter.Increment(1);
}
});
AnsiConsole.MarkupLine($"[green]Unguided:[/][white]{unguidedDecoder.Read().ReplaceLineEndings(" ")}[/]");
AnsiConsole.MarkupLine($"[green]Guided:[/][white]{guidedDecoder.Read().ReplaceLineEndings(" ")}[/]");
}
private class GuidedSampler(Conversation? guidance, float weight)
: BaseSamplingPipeline
{
public override void Accept(SafeLLamaContextHandle ctx, LLamaToken token)
{
}
public override ISamplingPipeline Clone()
{
throw new NotSupportedException();
}
protected override void ProcessLogits(SafeLLamaContextHandle ctx, Span<float> logits, ReadOnlySpan<LLamaToken> lastTokens)
{
if (guidance == null)
return;
// Get the logits generated by the guidance sequences
var guidanceLogits = guidance.Sample();
// Use those logits to guide this sequence
// NativeApi.llama_sample_apply_guidance(ctx, logits, guidanceLogits, weight);
}
protected override LLamaToken ProcessTokenDataArray(SafeLLamaContextHandle ctx, LLamaTokenDataArray candidates, ReadOnlySpan<LLamaToken> lastTokens)
{
candidates.Temperature(ctx, 0.8f);
candidates.TopK(ctx, 25);
return candidates.SampleToken(ctx);
}
protected override IReadOnlyList<LLamaToken> GetProtectedTokens(SafeLLamaContextHandle ctx)
{
throw new NotImplementedException();
}
}
} I'm stuck right now because I don't know how to implement GetProtectedTokens. If I'm missing something please let me know. |
I'd suggest cloning the master branch and working with that, |
I've been experimenting with the Because when I'm attempting to create multiple conversations when the second one is prompted I get
So I guess my actual question is, what parts of the pipeline can |
The parallelism is built into it - when you call |
The key for |
I got that figured out after a bit as well @AsakusaRinne but thanks for the advice nontheless. I managed to get a working iteration of it and thought I'd share for people who find this issue as well. public class MultiChatSession(BatchedExecutor executor)
{
private BatchedExecutor _executor = executor;
private Dictionary<Conversation, DataStream> _conversations = [];
private readonly SemaphoreSlim inferenceLock = new(1, 1);
private bool _inferenceStarted = false;
public async Task<DataStream> Add(string prompt)
{
Console.WriteLine("Attempting to add a conversation, waiting to aquire lock");
await inferenceLock.WaitAsync(); // Acquire lock before adding
try
{
Console.WriteLine("Started adding a conversation");
var dataStream = new DataStream();
var conversation = _executor.Create();
conversation.Prompt(prompt);
_conversations.Add(conversation, dataStream);
Console.WriteLine("A conversation was added");
return dataStream;
}
finally
{
inferenceLock.Release(); // Release lock after adding
Console.WriteLine("Done adding conversation, lock released");
if (!_inferenceStarted)
{
_inferenceStarted = true;
_ = Task.Run(RunInferenceLoopAsync);
}
}
}
private async Task RunInferenceLoopAsync()
{
while(true)
{
Console.WriteLine("Attempting to start inference iteration, waiting to aquire lock");
await inferenceLock.WaitAsync(); // Acquire lock before starting inference
try
{
Console.WriteLine("Inference iteration started");
await _executor.Infer();
var decoder = new StreamingTokenDecoder(_executor.Context);
var ctx = _executor.Context.NativeHandle;
foreach (var kvp in _conversations)
{
using var sampler = new DefaultSamplingPipeline();
var token = sampler.Sample(ctx, kvp.Key.Sample(), Array.Empty<LLamaToken>());
sampler.Accept(ctx, token);
if (token == _executor.Model.EndOfSentenceToken)
{
kvp.Value.MarkAsCompleted();
_conversations.Remove(kvp.Key);
}
else
{
decoder.Add(token);
kvp.Key.Prompt(token);
kvp.Value.AddToken(decoder.Read());
}
}
Console.WriteLine("Inference iteration completed");
}
finally
{
inferenceLock.Release();
Console.WriteLine("Done with inference iteration, lock released");
}
if (_conversations.Count == 0)
{
_inferenceStarted = false;
break;
}
}
}
} A point I wanted in my application was the ability to add a conversation at any time and the challanges with that were mostly making sure no conversations were added at the same time or that conversations were added during inference. This version is definitly somewhat verbose but it helped me to see when the thread was locked otherwise it'd be really easy to get stuck somewhere. Edit: Made some code changes so the inference loop only runs when there are conversations to be inferred. |
Thanks @WesselvanGils I had tried the llama.cpp webserver with 2 slots and continuous batching but the speed seemed to be more twice as slow as single prompt processing. |
Try the |
I had taken a look at BatchedExecutor demo but was confused how to add different prompts and get different streaming results for each prompt. |
@AshD basically every time you create new conversation with executoriale.Create() the executor takes note, when running inference it will infer for every conversation that was created. The example class I made shows this off pretty well I believe. |
The basic flow for the batched executor is:
using var conversation = executor.Create();
conversation.Prompt("Hello AshD");
executor.Infer()
// Sampling can be done however you like. There's a whole sampling pipeline infrastructure in LLamaSharp you can use. Here's a basic one
var token = new DefaultSamplingPipeline().Sample(context, conversation.Sample(), Array.Empty<LLamaToken>());
conversation.Prompt(token);
You can have as many conversations as you like, although of course they do take up extra memory for the KV cache. If you have several conversations with totally unrelated prompts you will get a small speedup. Where it gets really cool is where you have common bits of multiple conversations, for example if you have a system prompt you can evaluate that and then fork that conversation and now you have 2 conversations using exactly the same bit of KV cache (no extra memory consumed, and you didn't have to re-evaluate the entire prompt). |
Thanks @martindevans and @WesselvanGils Dumb question, since I have not used LLamaSharp API at a low level. How do I get the Generated tokens in a streaming manner? Is it using the DataStream? |
The BatchedExecutor doesn't have anything like that at the moment, it's up to you to sample a token, reprompt the conversation with that token, and detokenize it into text using a |
The DataStream object you see is something I just made myself so other parts of the program can consume the data at their own leisure. I can share it if you're interested. But basically after each inference run you get one token. So in essence you already have a "stream". So it's, infer -> sample the token -> add it back in the conversation, doing that over and over is how you achieve a "stream" of tokens. |
I've been playing around with BatchedExecutor example and noticed when I set sampling temperature to 0 (or use GreedySamplingPipeline), the conversations diverge. I've tries a few short initial prompts and sometimes it does not happen (all forks are the same), but it usually does. |
Unfortunately I think that's probably because the inference process itself is not completely deterministic. |
That doesn't sound right... As I understand, when sampling using greedy sampler (which always picks most probable token), the inference process should always return same result for same prompt. |
Yeah you're right, I was wrong there. I'll have a look into this. |
I did a test with batched example from llama.cpp (modified to use greedy sampling) and the results are the same. So I created an issue in llama.cpp (ggerganov/llama.cpp#6583) for this. |
Ah interesting, I'll wait and see if someone upstream knows what the issue is. Thanks for looking into that. |
Looks like it's expected, according to ggerganov. |
I'll close this issue now, since I think the questions have been answered and there hasn't been any activity for a while. |
Is there a way to do Parallel Inferencing with LlamaSharp?
I want to send multiple prompts from different threads to different StatelessExecutor objects to do inference.
Currently, I use a semaphore to enable only one StatelessExecutor processing at a time.
Thanks,
Ash
The text was updated successfully, but these errors were encountered: